From c73878fc5ed157845befee0afcb65519782ec4c5 Mon Sep 17 00:00:00 2001 From: Luke Kingland Date: Thu, 20 Feb 2025 11:34:45 +0900 Subject: [PATCH 1/5] fix formatting --- src/func_python/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/func_python/http.py b/src/func_python/http.py index 9394b2b9..00cad4a2 100644 --- a/src/func_python/http.py +++ b/src/func_python/http.py @@ -49,7 +49,7 @@ def __init__(self, f): self.f = f self.stop_event = asyncio.Event() if hasattr(self.f, "handle") is not True: - raise AttributeError( "Function must implement a 'handle' method.") + raise AttributeError("Function must implement a 'handle' method.") # Inform the user via logs that defaults will be used for health # endpoints if no matchin methods were provided. From d124770ab62e83575a71bfd495b46cf7e448ea24 Mon Sep 17 00:00:00 2001 From: Luke Kingland Date: Thu, 27 Feb 2025 21:20:32 +0900 Subject: [PATCH 2/5] update readme --- README.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 1c6c8667..973ad6e5 100644 --- a/README.md +++ b/README.md @@ -7,12 +7,15 @@ Python as a network service. ``` . ├── cmd -│   └── fhttp - Example a Function using the http middleware +│   └── fhttp - Example Function using the http middleware +│   └── fce - Example Function using the CloudEvent middleware ├── src/func_python -│   ├── http.py - HTTP Middleware +│   ├── http.py - HTTP Middleware +│   ├── cloudevents.py - CloudEvent Middleware ├── tests -│   ├── http_test.py - HTTP Middleware tests -└── README.md - This Readme +│   ├── http_test.py - HTTP Middleware tests +│   ├── cloudevent_test.py - CloudEvent tests +└── README.md - This Readme ``` ## Development @@ -25,7 +28,7 @@ Run suite: To enable more granular log levels: `poetry run pytest --log-cli-level=INFO` -## Example Main +## Example Commands Minimal example of running the test command, which shows how this library is used when integrate with Functions, and can be useful during dev. From e7eabe14277a2020173b02bcd16a8e891869efde Mon Sep 17 00:00:00 2001 From: Luke Kingland Date: Fri, 28 Feb 2025 00:33:05 +0900 Subject: [PATCH 3/5] cloudevents middleware --- poetry.lock | 60 +++++++- pyproject.toml | 1 + src/func_python/cloudevent.py | 278 ++++++++++++++++++++++++++++++++++ src/func_python/example.py | 125 +++++++++++++++ tests/test_cloudevent.py | 226 +++++++++++++++++++++++++++ tests/test_http.py | 2 + 6 files changed, 689 insertions(+), 3 deletions(-) create mode 100644 src/func_python/cloudevent.py create mode 100644 src/func_python/example.py create mode 100644 tests/test_cloudevent.py diff --git a/poetry.lock b/poetry.lock index 8def4ee2..4121ba93 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand. [[package]] name = "anyio" @@ -6,6 +6,7 @@ version = "4.7.0" description = "High level compatibility layer for multiple asynchronous event loop implementations" optional = false python-versions = ">=3.9" +groups = ["dev"] files = [ {file = "anyio-4.7.0-py3-none-any.whl", hash = "sha256:ea60c3723ab42ba6fff7e8ccb0488c898ec538ff4df1f1d5e642c3601d07e352"}, {file = "anyio-4.7.0.tar.gz", hash = "sha256:2f834749c602966b7d456a7567cafcb309f96482b5081d14ac93ccd457f9dd48"}, @@ -27,28 +28,65 @@ version = "2024.8.30" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "certifi-2024.8.30-py3-none-any.whl", hash = "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8"}, {file = "certifi-2024.8.30.tar.gz", hash = "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9"}, ] +[[package]] +name = "cloudevents" +version = "1.11.0" +description = "CloudEvents Python SDK" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "cloudevents-1.11.0-py3-none-any.whl", hash = "sha256:77edb4f2b01f405c44ea77120c3213418dbc63d8859f98e9e85de875502b8a76"}, + {file = "cloudevents-1.11.0.tar.gz", hash = "sha256:5be990583e99f3b08af5a709460e20b25cb169270227957a20b47a6ec8635e66"}, +] + +[package.dependencies] +deprecation = ">=2.0,<3.0" + +[package.extras] +pydantic = ["pydantic (>=1.0.0,<3.0)"] + [[package]] name = "colorama" version = "0.4.6" description = "Cross-platform colored terminal text." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +groups = ["dev"] +markers = "sys_platform == \"win32\"" files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "deprecation" +version = "2.1.0" +description = "A library to handle automated deprecations" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "deprecation-2.1.0-py2.py3-none-any.whl", hash = "sha256:a10811591210e1fb0e768a8c25517cabeabcba6f0bf96564f8ff45189f90b14a"}, + {file = "deprecation-2.1.0.tar.gz", hash = "sha256:72b3bde64e5d778694b0cf68178aed03d15e15477116add3fb773e581f9518ff"}, +] + +[package.dependencies] +packaging = "*" + [[package]] name = "h11" version = "0.14.0" description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" optional = false python-versions = ">=3.7" +groups = ["main", "dev"] files = [ {file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"}, {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, @@ -60,6 +98,7 @@ version = "4.1.0" description = "HTTP/2 State-Machine based protocol implementation" optional = false python-versions = ">=3.6.1" +groups = ["main"] files = [ {file = "h2-4.1.0-py3-none-any.whl", hash = "sha256:03a46bcf682256c95b5fd9e9a99c1323584c3eec6440d379b9903d709476bc6d"}, {file = "h2-4.1.0.tar.gz", hash = "sha256:a83aca08fbe7aacb79fec788c9c0bac936343560ed9ec18b82a13a12c28d2abb"}, @@ -75,6 +114,7 @@ version = "4.0.0" description = "Pure-Python HPACK header compression" optional = false python-versions = ">=3.6.1" +groups = ["main"] files = [ {file = "hpack-4.0.0-py3-none-any.whl", hash = "sha256:84a076fad3dc9a9f8063ccb8041ef100867b1878b25ef0ee63847a5d53818a6c"}, {file = "hpack-4.0.0.tar.gz", hash = "sha256:fc41de0c63e687ebffde81187a948221294896f6bdc0ae2312708df339430095"}, @@ -86,6 +126,7 @@ version = "1.0.7" description = "A minimal low-level HTTP client." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "httpcore-1.0.7-py3-none-any.whl", hash = "sha256:a3fff8f43dc260d5bd363d9f9cf1830fa3a458b332856f34282de498ed420edd"}, {file = "httpcore-1.0.7.tar.gz", hash = "sha256:8551cb62a169ec7162ac7be8d4817d561f60e08eaa485234898414bb5a8a0b4c"}, @@ -107,6 +148,7 @@ version = "0.28.1" description = "The next generation HTTP client." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, @@ -131,6 +173,7 @@ version = "0.17.3" description = "A ASGI Server based on Hyper libraries and inspired by Gunicorn" optional = false python-versions = ">=3.8" +groups = ["main"] files = [ {file = "hypercorn-0.17.3-py3-none-any.whl", hash = "sha256:059215dec34537f9d40a69258d323f56344805efb462959e727152b0aa504547"}, {file = "hypercorn-0.17.3.tar.gz", hash = "sha256:1b37802ee3ac52d2d85270700d565787ab16cf19e1462ccfa9f089ca17574165"}, @@ -154,6 +197,7 @@ version = "6.0.1" description = "HTTP/2 framing layer for Python" optional = false python-versions = ">=3.6.1" +groups = ["main"] files = [ {file = "hyperframe-6.0.1-py3-none-any.whl", hash = "sha256:0ec6bafd80d8ad2195c4f03aacba3a8265e57bc4cff261e802bf39970ed02a15"}, {file = "hyperframe-6.0.1.tar.gz", hash = "sha256:ae510046231dc8e9ecb1a6586f63d2347bf4c8905914aa84ba585ae85f28a914"}, @@ -165,6 +209,7 @@ version = "3.10" description = "Internationalized Domain Names in Applications (IDNA)" optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, @@ -179,6 +224,7 @@ version = "2.0.0" description = "brain-dead simple config-ini parsing" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, @@ -190,6 +236,7 @@ version = "24.2" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" +groups = ["main", "dev"] files = [ {file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"}, {file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"}, @@ -201,6 +248,7 @@ version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, @@ -216,6 +264,7 @@ version = "2.0.0" description = "A pure-Python implementation of the HTTP/2 priority tree" optional = false python-versions = ">=3.6.1" +groups = ["main"] files = [ {file = "priority-2.0.0-py3-none-any.whl", hash = "sha256:6f8eefce5f3ad59baf2c080a664037bb4725cd0a790d53d59ab4059288faf6aa"}, {file = "priority-2.0.0.tar.gz", hash = "sha256:c965d54f1b8d0d0b19479db3924c7c36cf672dbf2aec92d43fbdaf4492ba18c0"}, @@ -227,6 +276,7 @@ version = "8.3.4" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pytest-8.3.4-py3-none-any.whl", hash = "sha256:50e16d954148559c9a74109af1eaf0c945ba2d8f30f0a3d3335edde19788b6f6"}, {file = "pytest-8.3.4.tar.gz", hash = "sha256:965370d062bce11e73868e0335abac31b4d3de0e82f4007408d242b4f8610761"}, @@ -247,6 +297,7 @@ version = "1.3.1" description = "Sniff out which async library your code is running under" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, @@ -258,6 +309,8 @@ version = "4.12.2" description = "Backported and Experimental Type Hints for Python 3.8+" optional = false python-versions = ">=3.8" +groups = ["dev"] +markers = "python_version < \"3.13\"" files = [ {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, @@ -269,6 +322,7 @@ version = "1.2.0" description = "WebSockets state-machine based protocol implementation" optional = false python-versions = ">=3.7.0" +groups = ["main"] files = [ {file = "wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736"}, {file = "wsproto-1.2.0.tar.gz", hash = "sha256:ad565f26ecb92588a3e43bc3d96164de84cd9902482b130d0ddbaa9664a85065"}, @@ -278,6 +332,6 @@ files = [ h11 = ">=0.9.0,<1" [metadata] -lock-version = "2.0" +lock-version = "2.1" python-versions = "^3.12" -content-hash = "3db36d999bf00702369a8ebf4e486e8029d845bfee6e2755d7dd3c25a9c99e27" +content-hash = "457fc395a8f5ce1174f4e9f7e525c08ab7da87e99d158a3fd2150f52d7d3939f" diff --git a/pyproject.toml b/pyproject.toml index 63917769..5a17b16a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ repository = "https://github.com/knative-extensions/func-python" [tool.poetry.dependencies] python = "^3.12" hypercorn = "^0.17.3" +cloudevents = "^1.11.0" [tool.pytest.ini_options] pythonpath = ["src"] diff --git a/src/func_python/cloudevent.py b/src/func_python/cloudevent.py new file mode 100644 index 00000000..a8ce3071 --- /dev/null +++ b/src/func_python/cloudevent.py @@ -0,0 +1,278 @@ +import asyncio +import logging +import os +import signal +import hypercorn.config +import hypercorn.asyncio + +from cloudevents.http import from_http, CloudEvent +from cloudevents.conversion import to_structured, to_binary + +DEFAULT_LOG_LEVEL = logging.INFO +DEFAULT_LISTEN_ADDRESS = "127.0.0.1:8080" + +logging.basicConfig(level=DEFAULT_LOG_LEVEL) + + +def serve(f): + """serve a function f by wrapping it in an ASGI web application + and starting. The function can be either a constructor for a functon + instance (named "new") or a simple ASGI handler function (named "handle"). + """ + logging.debug("func runtime creating function instance") + + if f.__name__ == 'new': + return ASGIApplication(f()).serve() + elif f.__name__ == 'handle': + try: + return ASGIApplication(DefaultFunction(f)).serve() + except Exception as e: + logging.error(f"Server failed to start: {e}") + raise + else: + raise ValueError("function must be either be a constructor 'new' or a " + "handler function 'handle'.") + + +class DefaultFunction: + """DefaultFunction is used when the provided functon is not a constructor + for a Function instance, but rather a simple handler function""" + + def __init__(self, handler): + self.handle = handler + + async def handle(self, scope, receive, send): + # delegate to the handler implementation provided during construction. + await self.handle(scope, receive, send) + + +class ASGIApplication(): + def __init__(self, f): + self.f = f + self.stop_event = asyncio.Event() + if hasattr(self.f, "handle") is not True: + raise AttributeError("Function must implement a 'handle' method.") + + # Inform the user via logs that defaults will be used for health + # endpoints if no matchin methods were provided. + if hasattr(self.f, "alive") is not True: + logging.info( + "function does not implement 'alive'. Using default " + "implementation for liveness checks." + ) + if hasattr(self.f, "ready") is not True: + logging.info( + "function does not implement 'ready'. Using default " + "implementation for readiness checks." + ) + + def serve(self): + """serve serving this ASGIhandler, delegating implementation of + methods as necessary to the wrapped Function instance""" + cfg = hypercorn.config.Config() + cfg.bind = [os.getenv('LISTEN_ADDRESS', DEFAULT_LISTEN_ADDRESS)] + + logging.info(f"function starting on {cfg.bind}") + return asyncio.run(self._serve(cfg)) + + async def _serve(self, cfg): + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, self._handle_signal) + loop.add_signal_handler(signal.SIGTERM, self._handle_signal) + + await hypercorn.asyncio.serve(self, cfg) + + def _handle_signal(self): + logging.info("Signal received: initiating shutdown") + self.stop_event.set() + + async def on_start(self): + """on_start handles the ASGI server start event, delegating control + to the internal Function instance if it has a "start" method.""" + if hasattr(self.f, "start"): + self.f.start(os.environ.copy()) + else: + logging.debug("function does not implement 'start'. Skipping.") + + async def on_stop(self): + if hasattr(self.f, "stop"): + self.f.stop() + else: + logging.debug("function does not implement 'stop'. Skipping.") + self.stop_event.set() + + async def __call__(self, scope, receive, send): + if scope['type'] == 'lifespan': + while True: + message = await receive() + if message['type'] == 'lifespan.startup': + await self.on_start() + await send({'type': 'lifespan.startup.complete'}) + elif message['type'] == 'lifespan.shutdown': + await self.on_stop() + await send({'type': 'lifespan.shutdown.complete'}) + return + else: + break + + # Assert request is HTTP + if scope["type"] != "http": + await send_exception(send, 400, + "Functions currently only support ASGI/HTTP " + f"connections. Got {scope['type']}" + ) + return + + # Route request + try: + if scope['path'] == '/health/liveness': + await self.handle_liveness(scope, receive, send) + elif scope['path'] == '/health/readiness': + await self.handle_readiness(scope, receive, send) + else: + # CloudEvents Middleware + # Currently the http and cloudevents middleware implementations + # are identical with the exception of this section which + # reads the request as a CloudEvent and adds it to the scope, + # and sends a response CloudEvent if returned. + # Should this implementation prove adequate, we can combine + # into a single middleware with a swithch to enable this + # interstitial encode/decode, and thus avoid the approx. 200 + # lines of shared server boilerplate. + # + # Decode the event and make it available in the scope + scope["event"] = await decode_event(scope, receive) + # Wrap the sender in a CloudEventSender + send = CloudEventSender(send) + # Delegate processing to user's Function + await self.f.handle(scope, receive, send) + except Exception as e: + await send_exception_cloudevent(send, 500, f"Error: {e}") + + async def handle_liveness(self, scope, receive, send): + alive = True + message = "OK" + if hasattr(self.f, "alive"): + result = self.f.alive() + # The message return is optional + if isinstance(result, tuple): + alive, message = result + else: + alive = result + + if alive: + await send({'type': 'http.response.start', 'status': 200, + 'headers': [[b'content-type', b'text/plain']]}) + else: + await send({'type': 'http.response.start', 'status': 500, + 'headers': [[b'content-type', b'text/plain']]}) + + await send({'type': 'http.response.body', + 'body': f'{message}'.encode('utf-8'), + }) + + async def handle_readiness(self, scope, receive, send): + ready = True + message = "OK" + if hasattr(self.f, "ready"): + result = self.f.ready() + # The message return is optional + if isinstance(result, tuple): + ready, message = result + else: + ready = result + + if ready: + await send({'type': 'http.response.start', 'status': 200, + 'headers': [[b'content-type', b'text/plain']]}) + else: + await send({'type': 'http.response.start', 'status': 500, + 'headers': [[b'content-type', b'text/plain']]}) + + await send({'type': 'http.response.body', + 'body': f'{message}'.encode('utf-8'), + }) + + +async def decode_event(scope, receive): + body = await receive_body(receive) + headers = { + k.decode("utf-8").lower(): v.decode("utf-8") + for k, v in scope.get("headers", []) + } + return from_http(headers, body) + + +async def receive_body(receive): + """For CloudEvents: receive the body and return it as bytes""" + body = b"" + more_body = True + while more_body: + message = await receive() + body += message.get("body", b"") + more_body = message.get("more_body", False) + return body + + +async def send_exception(send, code, message): + await send({ + 'type': 'http.response.start', 'status': code, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', 'body': message, + }) + + +async def send_exception_cloudevent(send, status, message): + attributes = { + "type": "dev.functions.error", + "source": "/cloudevent/error", + } + data = {"message": message} + + await send.structured(CloudEvent(attributes, data), status) + + +class CloudEventSender: + """A sender which supports CloudEvents""" + + def __init__(self, send): + self._send = send + + async def __call__(self, event, status: int = 200): + """default send assumes a strcutred event""" + await self.structured(event, status) + + async def structured(self, event, status=200): + """send as a structured cloudevent""" + headers, body = to_structured(event) + await self._send_encoded_cloudevent(headers, body, status) + + async def binary(self, event, status=200): + """send as a binary cloudevent""" + headers, body = to_binary(event) + await self._send_encoded_cloudevent(headers, body, status) + + async def http(self, message): + """Send a raw http response, bypassing the automatic cloudevent + encoding. Use this for more granular control of the response.""" + self._send(message) + + async def _send_encoded_cloudevent(self, headers, body, status=200): + """Send the given cloudevent headers and body.""" + headers = [ + (k.encode(), v.encode()) + for k, v in headers.items() + ] + [(b"content-length", str(len(body)).encode())] + + await self._send({ + "type": "http.response.start", + "status": status, + "headers": headers + }) + + await self._send({ + "type": "http.response.body", + "body": body, + }) diff --git a/src/func_python/example.py b/src/func_python/example.py new file mode 100644 index 00000000..5ccb7962 --- /dev/null +++ b/src/func_python/example.py @@ -0,0 +1,125 @@ +from typing import Dict, Any, Callable, Awaitable, Optional +from functools import partial +from cloudevents.http import from_http, to_structured +from cloudevents.exceptions import MissingRequiredFields, InvalidRequiredFields + +async def receive_body(receive: Callable) -> bytes: + """Utility to receive the complete body from an ASGI receive callable""" + body = b"" + more_body = True + while more_body: + message = await receive() + body += message.get("body", b"") + more_body = message.get("more_body", False) + return body + +class CloudEventSender: """Wraps the ASGI send callable to provide CloudEvent-aware sending""" + + def __init__(self, send: Callable[[Dict[str, Any]], Awaitable[None]]): + self._send = send + + async def __call__(self, message: Dict[str, Any]) -> None: + await self._send(message) + + async def send_event(self, event: Dict[str, Any], status: int = 200) -> None: + """Send a CloudEvent as an HTTP response""" + headers, body = to_structured(event) + + await self._send({ + "type": "http.response.start", + "status": status, + "headers": [ + (k.encode(), v.encode()) + for k, v in headers.items() + ] + [(b"content-length", str(len(body)).encode())] + }) + + await self._send({ + "type": "http.response.body", + "body": body, + }) + + async def send_error(self, status: int, message: str, error_type: str = "error") -> None: + """Send an error as a CloudEvent""" + error_event = { + "specversion": "1.0", + "type": f"com.example.{error_type}", + "source": "/cloudevent/error", + "id": "error-response", + "data": {"error": message} + } + await self.send_event(error_event, status) + +class CloudEventMiddleware: + def __init__(self, app: Callable): + self.app = app + + async def __call__( + self, + scope: Dict[str, Any], + receive: Callable, + send: Callable + ) -> None: + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + try: + # Get the body and convert headers + body = await receive_body(receive) + headers = { + k.decode("utf-8").lower(): v.decode("utf-8") + for k, v in scope.get("headers", []) + } + + # Parse as CloudEvent and add to scope + event = from_http(headers, body) + scope["cloudevent"] = event + + # Wrap the send callable + cloud_sender = CloudEventSender(send) + + # Call the wrapped application + await self.app(scope, receive, cloud_sender) + + except (MissingRequiredFields, InvalidRequiredFields) as e: + # If it's not a valid CloudEvent, send error response + sender = CloudEventSender(send) + await sender.send_error(400, str(e), "validation.error") + except Exception as e: + # Handle unexpected errors + sender = CloudEventSender(send) + await sender.send_error(500, str(e), "internal.error") + +# Example usage: +async def app(scope: Dict[str, Any], receive: Callable, send: Callable) -> None: + """ + Example ASGI application using the enhanced scope and sender + """ + # The CloudEvent is already parsed and available in scope + event = scope["cloudevent"] + + # The send callable is wrapped to handle CloudEvents + sender = send # TypeVar would show this as CloudEventSender + + # Example response using the enhanced sender + response_event = { + "specversion": "1.0", + "type": "com.example.response", + "source": scope["path"], + "id": f"response-to-{event['id']}", + "data": { + "message": "Processed successfully", + "method": scope["method"], + "client": scope["client"] + } + } + + await sender.send_event(response_event) + +# Wrap your application with the middleware +app = CloudEventMiddleware(app) + +# Run with: +# hypercorn app:app --bind 0.0.0.0:3000 + diff --git a/tests/test_cloudevent.py b/tests/test_cloudevent.py new file mode 100644 index 00000000..2549841c --- /dev/null +++ b/tests/test_cloudevent.py @@ -0,0 +1,226 @@ +import httpx +import json +import logging +import os +import signal +import threading +import time +import uuid +import pytest +from func_python.cloudevent import serve +from cloudevents.conversion import to_structured +from cloudevents.http import CloudEvent + +logging.basicConfig(level=logging.INFO) + +# Set a dynamic test URL using an environment variable +os.environ["LISTEN_ADDRESS"] = os.getenv("LISTEN_ADDRESS", "127.0.0.1:8081") + +# Retrieve the LISTEN_ADDRESS for use in the tests +LISTEN_ADDRESS = os.getenv("LISTEN_ADDRESS") + + +def test_static(): + """ + A basic test which ensures that serving a static "handle" method + succeeds without failure. + """ + + # Function + # An example minimal "static" user function which will be + # exposed on the network as an ASGI service by the middleware. + async def handle(scope, receive, send): + # Ensure that the scope contains the CloudEvent + if "event" not in scope: + await send(CloudEvent({}, {"message": "no event in scope"}), 500) + + attributes = { + "type": "com.example.teststatic", + "source": "https://example.com/event-producer", + } + content = {"message": "OK test_static"} + + # The default send encodes the cloudevent using to_structured. + # use send.binary to send it binary encoded, or send.http to use + # the raw ASGI response object without CloudEvent middleware. + await send(CloudEvent(attributes, content)) + + # Test + # Run async, this attempts to contact the running function and confirm the + # handle method was successfully served. + test_complete = threading.Event() + test_results = {"success": False, "error": None} + + def test(): + try: + wait_for_function() # to become available + + # Send a CloudEvent to the Function + attributes = { + "type": "com.example.test-static", + "source": "https://example.com/event-producer", + } + data = {"message": "test_static"} + headers, content = to_structured(CloudEvent(attributes, data)) + response = httpx.post( + f"http://{LISTEN_ADDRESS}", + headers=headers, + content=content + ) + + # Assertions + assert response.status_code == 200 + response_event = json.loads(response.text) + assert response_event["data"]["message"] == "OK test_static" + test_results["success"] = True + except Exception as e: + test_results["error"] = str(e) + finally: + test_complete.set() # signal test completion + os.kill(os.getpid(), signal.SIGINT) # gracefully term Function + + # Start the test loop asynchronously + test_thread = threading.Thread(target=test) + test_thread.daemon = True # exit when test does + test_thread.start() + + # Serve the handle function + # Note this will fail if not in the main thread due to "set_wakeup_fd" + serve(handle) + + if not test_complete.wait(10): + pytest.fail("Test timed out") + + if not test_results["success"]: + pytest.fail(test_results["error"] or "Test failed") + + +def test_instanced(): + """ + ensures that a user function developed using the default "instanced" + style is served by the middleware + """ + + # User Function + # An example standard "instanced" function (user's Function) which is + # exposed on the network as an ASGI service by the middleware. + class MyFunction: + async def handle(self, scope, receive, send): + # Check if this is a CloudEvent + if "event" not in scope: + await send(CloudEvent({},{"message": "no event in scope"}), 500) + + attributes = { + "type": "com.example.testinstanced", + "source": "https://example.com/event-producer", + } + content = {"message": "OK test_instanced"} + + # The default send encodes the cloudevent using to_structured. + # use send.binary to send it binary encoded, or send.http to use + # the raw ASGI response object without CloudEvent middleware. + await send(CloudEvent(attributes, content)) + + def new(): + return MyFunction() + + # Tests + # Attempts to contact the running function and confirm the user's function + # was instantiated and served. + test_complete = threading.Event() + test_results = {"success": False, "error": None} + + def test(): + try: + wait_for_function() # to become available + + # Send a CloudEvent to the Function + attributes = { + "type": "com.example.test-static", + "source": "https://example.com/event-producer", + } + data = {"message": "test_instanced"} + headers, content = to_structured(CloudEvent(attributes, data)) + response = httpx.post( + f"http://{LISTEN_ADDRESS}", + headers=headers, + content=content + ) + + # Assertions + assert response.status_code == 200 + response_event = json.loads(response.text) + assert response_event["data"]["message"] == "OK test_instanced" + test_results["success"] = True + except Exception as e: + test_results["error"] = str(e) + finally: + test_complete.set() # signal test completion + os.kill(os.getpid(), signal.SIGINT) # gracefully term Function + + # Start the test loop asynchronously + test_thread = threading.Thread(target=test) + test_thread.daemon = True # exit when test does + test_thread.start() + + # Serve the Function + serve(new) + + if not test_complete.wait(10): + pytest.fail("Test timed out") + + if not test_results["success"]: + pytest.fail(test_results["error"] or "Test failed") + + +def test_signal_handling(): + """ + Tests that the server gracefully shuts down when receiving a SIGINT signal. + """ + # Example minimal ASGI app that handles CloudEvents + async def handle(scope, receive, send): + # For the signal handling test, we just need the server to start + # and then shut down gracefully, so we keep the handler simple + await send({ + 'type': 'http.response.start', + 'status': 200, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': b'Signal Handling OK', + }) + + # Function to send a SIGINT after a delay + def send_signal(): + time.sleep(2) # Allow server to start + os.kill(os.getpid(), signal.SIGINT) + + # Start signal sender in a separate thread + signal_thread = threading.Thread(target=send_signal) + signal_thread.start() + + # Serve the function + try: + serve(handle) + except KeyboardInterrupt: + logging.info("SIGINT received and handled gracefully.") + + signal_thread.join(timeout=5) + + +class FunctionNotAvailableError(Exception): + """ Raised when a Function is not available """ + + +def wait_for_function(): + max_retries = 20 + for i in range(max_retries): + time.sleep(0.5) + try: + httpx.get(f"http://{LISTEN_ADDRESS}/health/liveness") + break + except httpx.ConnectError: + logging.info(f"Retrying ({i+1}/{max_retries})...") + if i >= max_retries: + raise FunctionNotAvailableError(f"Function at {LISTEN_ADDRESS} did not start after {max_retries} attempts") diff --git a/tests/test_http.py b/tests/test_http.py index 45a39914..298590de 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -73,6 +73,7 @@ def test(): test_thread.join(timeout=5) + def test_instanced(): """ ensures that a user function developed using the default "instanced" @@ -134,6 +135,7 @@ def test(): test_thread.join(timeout=5) + def test_signal_handling(): """ Tests that the server gracefully shuts down when receiving a SIGINT signal. From 8ad9e717ef598bff9337c8d12c58d9acc831e157 Mon Sep 17 00:00:00 2001 From: Luke Kingland Date: Mon, 3 Mar 2025 11:26:09 +0900 Subject: [PATCH 4/5] reorg README --- README.md | 51 ++++++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 973ad6e5..a0b480ac 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,32 @@ p. From a personal fork, create a new worktree for the bug, feature or chore 8. (optional) pull into local fork's main and push to remote fork main. +## Testing + +To run the package level tests + +To test if the package published to Test PyPI works, ensure the +project has it added as an explicit source in pyproject.toml: +``` +[[tool.poetry.source]] +name = "test-pypi" +url = "https://test.pypi.org/simple/" +priority = "explicit" +``` +Then update the dependency to explicitly pull the new version which is only +available on TestPyPI. For example to test a hypothetical unreleased version +0.1.2: +``` +[tool.poetry.dependencies] +func_python = {version = "0.1.2", source = "test-pypi"} +``` +Run `poetry install` to install the unreleased version from Test PyPI + +Note: do not check in this change. + + + + ## Releasing NOTE: This process is currently undergoing minor tweaks, and thus contains @@ -102,31 +128,6 @@ automated. 10. Update the GitHub release's notes to be the changelog section's contents . -## Testing - -To run the package level tests - -To test if the package published to Test PyPI works, ensure the -project has it added as an explicit source in pyproject.toml: -``` -[[tool.poetry.source]] -name = "test-pypi" -url = "https://test.pypi.org/simple/" -priority = "explicit" -``` -Then update the dependency to explicitly pull the new version which is only -available on TestPyPI. For example to test a hypothetical unreleased version -0.1.2: -``` -[tool.poetry.dependencies] -func_python = {version = "0.1.2", source = "test-pypi"} -``` -Run `poetry install` to install the unreleased version from Test PyPI - -Note: do not check in this change. - - - ### Potential improvements From 4418ad534901318bc42d751558b73f2e625bb71e Mon Sep 17 00:00:00 2001 From: Luke Kingland Date: Mon, 3 Mar 2025 11:29:10 +0900 Subject: [PATCH 5/5] remove example --- src/func_python/example.py | 125 ------------------------------------- 1 file changed, 125 deletions(-) delete mode 100644 src/func_python/example.py diff --git a/src/func_python/example.py b/src/func_python/example.py deleted file mode 100644 index 5ccb7962..00000000 --- a/src/func_python/example.py +++ /dev/null @@ -1,125 +0,0 @@ -from typing import Dict, Any, Callable, Awaitable, Optional -from functools import partial -from cloudevents.http import from_http, to_structured -from cloudevents.exceptions import MissingRequiredFields, InvalidRequiredFields - -async def receive_body(receive: Callable) -> bytes: - """Utility to receive the complete body from an ASGI receive callable""" - body = b"" - more_body = True - while more_body: - message = await receive() - body += message.get("body", b"") - more_body = message.get("more_body", False) - return body - -class CloudEventSender: """Wraps the ASGI send callable to provide CloudEvent-aware sending""" - - def __init__(self, send: Callable[[Dict[str, Any]], Awaitable[None]]): - self._send = send - - async def __call__(self, message: Dict[str, Any]) -> None: - await self._send(message) - - async def send_event(self, event: Dict[str, Any], status: int = 200) -> None: - """Send a CloudEvent as an HTTP response""" - headers, body = to_structured(event) - - await self._send({ - "type": "http.response.start", - "status": status, - "headers": [ - (k.encode(), v.encode()) - for k, v in headers.items() - ] + [(b"content-length", str(len(body)).encode())] - }) - - await self._send({ - "type": "http.response.body", - "body": body, - }) - - async def send_error(self, status: int, message: str, error_type: str = "error") -> None: - """Send an error as a CloudEvent""" - error_event = { - "specversion": "1.0", - "type": f"com.example.{error_type}", - "source": "/cloudevent/error", - "id": "error-response", - "data": {"error": message} - } - await self.send_event(error_event, status) - -class CloudEventMiddleware: - def __init__(self, app: Callable): - self.app = app - - async def __call__( - self, - scope: Dict[str, Any], - receive: Callable, - send: Callable - ) -> None: - if scope["type"] != "http": - await self.app(scope, receive, send) - return - - try: - # Get the body and convert headers - body = await receive_body(receive) - headers = { - k.decode("utf-8").lower(): v.decode("utf-8") - for k, v in scope.get("headers", []) - } - - # Parse as CloudEvent and add to scope - event = from_http(headers, body) - scope["cloudevent"] = event - - # Wrap the send callable - cloud_sender = CloudEventSender(send) - - # Call the wrapped application - await self.app(scope, receive, cloud_sender) - - except (MissingRequiredFields, InvalidRequiredFields) as e: - # If it's not a valid CloudEvent, send error response - sender = CloudEventSender(send) - await sender.send_error(400, str(e), "validation.error") - except Exception as e: - # Handle unexpected errors - sender = CloudEventSender(send) - await sender.send_error(500, str(e), "internal.error") - -# Example usage: -async def app(scope: Dict[str, Any], receive: Callable, send: Callable) -> None: - """ - Example ASGI application using the enhanced scope and sender - """ - # The CloudEvent is already parsed and available in scope - event = scope["cloudevent"] - - # The send callable is wrapped to handle CloudEvents - sender = send # TypeVar would show this as CloudEventSender - - # Example response using the enhanced sender - response_event = { - "specversion": "1.0", - "type": "com.example.response", - "source": scope["path"], - "id": f"response-to-{event['id']}", - "data": { - "message": "Processed successfully", - "method": scope["method"], - "client": scope["client"] - } - } - - await sender.send_event(response_event) - -# Wrap your application with the middleware -app = CloudEventMiddleware(app) - -# Run with: -# hypercorn app:app --bind 0.0.0.0:3000 -