diff --git a/README.md b/README.md index 1c6c8667..a0b480ac 100644 --- a/README.md +++ b/README.md @@ -7,12 +7,15 @@ Python as a network service. ``` . ├── cmd -│   └── fhttp - Example a Function using the http middleware +│   └── fhttp - Example Function using the http middleware +│   └── fce - Example Function using the CloudEvent middleware ├── src/func_python -│   ├── http.py - HTTP Middleware +│   ├── http.py - HTTP Middleware +│   ├── cloudevents.py - CloudEvent Middleware ├── tests -│   ├── http_test.py - HTTP Middleware tests -└── README.md - This Readme +│   ├── http_test.py - HTTP Middleware tests +│   ├── cloudevent_test.py - CloudEvent tests +└── README.md - This Readme ``` ## Development @@ -25,7 +28,7 @@ Run suite: To enable more granular log levels: `poetry run pytest --log-cli-level=INFO` -## Example Main +## Example Commands Minimal example of running the test command, which shows how this library is used when integrate with Functions, and can be useful during dev. @@ -63,6 +66,32 @@ p. From a personal fork, create a new worktree for the bug, feature or chore 8. (optional) pull into local fork's main and push to remote fork main. +## Testing + +To run the package level tests + +To test if the package published to Test PyPI works, ensure the +project has it added as an explicit source in pyproject.toml: +``` +[[tool.poetry.source]] +name = "test-pypi" +url = "https://test.pypi.org/simple/" +priority = "explicit" +``` +Then update the dependency to explicitly pull the new version which is only +available on TestPyPI. For example to test a hypothetical unreleased version +0.1.2: +``` +[tool.poetry.dependencies] +func_python = {version = "0.1.2", source = "test-pypi"} +``` +Run `poetry install` to install the unreleased version from Test PyPI + +Note: do not check in this change. + + + + ## Releasing NOTE: This process is currently undergoing minor tweaks, and thus contains @@ -99,31 +128,6 @@ automated. 10. Update the GitHub release's notes to be the changelog section's contents . -## Testing - -To run the package level tests - -To test if the package published to Test PyPI works, ensure the -project has it added as an explicit source in pyproject.toml: -``` -[[tool.poetry.source]] -name = "test-pypi" -url = "https://test.pypi.org/simple/" -priority = "explicit" -``` -Then update the dependency to explicitly pull the new version which is only -available on TestPyPI. For example to test a hypothetical unreleased version -0.1.2: -``` -[tool.poetry.dependencies] -func_python = {version = "0.1.2", source = "test-pypi"} -``` -Run `poetry install` to install the unreleased version from Test PyPI - -Note: do not check in this change. - - - ### Potential improvements diff --git a/poetry.lock b/poetry.lock index 8def4ee2..4121ba93 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand. [[package]] name = "anyio" @@ -6,6 +6,7 @@ version = "4.7.0" description = "High level compatibility layer for multiple asynchronous event loop implementations" optional = false python-versions = ">=3.9" +groups = ["dev"] files = [ {file = "anyio-4.7.0-py3-none-any.whl", hash = "sha256:ea60c3723ab42ba6fff7e8ccb0488c898ec538ff4df1f1d5e642c3601d07e352"}, {file = "anyio-4.7.0.tar.gz", hash = "sha256:2f834749c602966b7d456a7567cafcb309f96482b5081d14ac93ccd457f9dd48"}, @@ -27,28 +28,65 @@ version = "2024.8.30" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "certifi-2024.8.30-py3-none-any.whl", hash = "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8"}, {file = "certifi-2024.8.30.tar.gz", hash = "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9"}, ] +[[package]] +name = "cloudevents" +version = "1.11.0" +description = "CloudEvents Python SDK" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "cloudevents-1.11.0-py3-none-any.whl", hash = "sha256:77edb4f2b01f405c44ea77120c3213418dbc63d8859f98e9e85de875502b8a76"}, + {file = "cloudevents-1.11.0.tar.gz", hash = "sha256:5be990583e99f3b08af5a709460e20b25cb169270227957a20b47a6ec8635e66"}, +] + +[package.dependencies] +deprecation = ">=2.0,<3.0" + +[package.extras] +pydantic = ["pydantic (>=1.0.0,<3.0)"] + [[package]] name = "colorama" version = "0.4.6" description = "Cross-platform colored terminal text." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +groups = ["dev"] +markers = "sys_platform == \"win32\"" files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "deprecation" +version = "2.1.0" +description = "A library to handle automated deprecations" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "deprecation-2.1.0-py2.py3-none-any.whl", hash = "sha256:a10811591210e1fb0e768a8c25517cabeabcba6f0bf96564f8ff45189f90b14a"}, + {file = "deprecation-2.1.0.tar.gz", hash = "sha256:72b3bde64e5d778694b0cf68178aed03d15e15477116add3fb773e581f9518ff"}, +] + +[package.dependencies] +packaging = "*" + [[package]] name = "h11" version = "0.14.0" description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" optional = false python-versions = ">=3.7" +groups = ["main", "dev"] files = [ {file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"}, {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, @@ -60,6 +98,7 @@ version = "4.1.0" description = "HTTP/2 State-Machine based protocol implementation" optional = false python-versions = ">=3.6.1" +groups = ["main"] files = [ {file = "h2-4.1.0-py3-none-any.whl", hash = "sha256:03a46bcf682256c95b5fd9e9a99c1323584c3eec6440d379b9903d709476bc6d"}, {file = "h2-4.1.0.tar.gz", hash = "sha256:a83aca08fbe7aacb79fec788c9c0bac936343560ed9ec18b82a13a12c28d2abb"}, @@ -75,6 +114,7 @@ version = "4.0.0" description = "Pure-Python HPACK header compression" optional = false python-versions = ">=3.6.1" +groups = ["main"] files = [ {file = "hpack-4.0.0-py3-none-any.whl", hash = "sha256:84a076fad3dc9a9f8063ccb8041ef100867b1878b25ef0ee63847a5d53818a6c"}, {file = "hpack-4.0.0.tar.gz", hash = "sha256:fc41de0c63e687ebffde81187a948221294896f6bdc0ae2312708df339430095"}, @@ -86,6 +126,7 @@ version = "1.0.7" description = "A minimal low-level HTTP client." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "httpcore-1.0.7-py3-none-any.whl", hash = "sha256:a3fff8f43dc260d5bd363d9f9cf1830fa3a458b332856f34282de498ed420edd"}, {file = "httpcore-1.0.7.tar.gz", hash = "sha256:8551cb62a169ec7162ac7be8d4817d561f60e08eaa485234898414bb5a8a0b4c"}, @@ -107,6 +148,7 @@ version = "0.28.1" description = "The next generation HTTP client." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, @@ -131,6 +173,7 @@ version = "0.17.3" description = "A ASGI Server based on Hyper libraries and inspired by Gunicorn" optional = false python-versions = ">=3.8" +groups = ["main"] files = [ {file = "hypercorn-0.17.3-py3-none-any.whl", hash = "sha256:059215dec34537f9d40a69258d323f56344805efb462959e727152b0aa504547"}, {file = "hypercorn-0.17.3.tar.gz", hash = "sha256:1b37802ee3ac52d2d85270700d565787ab16cf19e1462ccfa9f089ca17574165"}, @@ -154,6 +197,7 @@ version = "6.0.1" description = "HTTP/2 framing layer for Python" optional = false python-versions = ">=3.6.1" +groups = ["main"] files = [ {file = "hyperframe-6.0.1-py3-none-any.whl", hash = "sha256:0ec6bafd80d8ad2195c4f03aacba3a8265e57bc4cff261e802bf39970ed02a15"}, {file = "hyperframe-6.0.1.tar.gz", hash = "sha256:ae510046231dc8e9ecb1a6586f63d2347bf4c8905914aa84ba585ae85f28a914"}, @@ -165,6 +209,7 @@ version = "3.10" description = "Internationalized Domain Names in Applications (IDNA)" optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, @@ -179,6 +224,7 @@ version = "2.0.0" description = "brain-dead simple config-ini parsing" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, @@ -190,6 +236,7 @@ version = "24.2" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" +groups = ["main", "dev"] files = [ {file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"}, {file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"}, @@ -201,6 +248,7 @@ version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, @@ -216,6 +264,7 @@ version = "2.0.0" description = "A pure-Python implementation of the HTTP/2 priority tree" optional = false python-versions = ">=3.6.1" +groups = ["main"] files = [ {file = "priority-2.0.0-py3-none-any.whl", hash = "sha256:6f8eefce5f3ad59baf2c080a664037bb4725cd0a790d53d59ab4059288faf6aa"}, {file = "priority-2.0.0.tar.gz", hash = "sha256:c965d54f1b8d0d0b19479db3924c7c36cf672dbf2aec92d43fbdaf4492ba18c0"}, @@ -227,6 +276,7 @@ version = "8.3.4" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pytest-8.3.4-py3-none-any.whl", hash = "sha256:50e16d954148559c9a74109af1eaf0c945ba2d8f30f0a3d3335edde19788b6f6"}, {file = "pytest-8.3.4.tar.gz", hash = "sha256:965370d062bce11e73868e0335abac31b4d3de0e82f4007408d242b4f8610761"}, @@ -247,6 +297,7 @@ version = "1.3.1" description = "Sniff out which async library your code is running under" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, @@ -258,6 +309,8 @@ version = "4.12.2" description = "Backported and Experimental Type Hints for Python 3.8+" optional = false python-versions = ">=3.8" +groups = ["dev"] +markers = "python_version < \"3.13\"" files = [ {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, @@ -269,6 +322,7 @@ version = "1.2.0" description = "WebSockets state-machine based protocol implementation" optional = false python-versions = ">=3.7.0" +groups = ["main"] files = [ {file = "wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736"}, {file = "wsproto-1.2.0.tar.gz", hash = "sha256:ad565f26ecb92588a3e43bc3d96164de84cd9902482b130d0ddbaa9664a85065"}, @@ -278,6 +332,6 @@ files = [ h11 = ">=0.9.0,<1" [metadata] -lock-version = "2.0" +lock-version = "2.1" python-versions = "^3.12" -content-hash = "3db36d999bf00702369a8ebf4e486e8029d845bfee6e2755d7dd3c25a9c99e27" +content-hash = "457fc395a8f5ce1174f4e9f7e525c08ab7da87e99d158a3fd2150f52d7d3939f" diff --git a/pyproject.toml b/pyproject.toml index 63917769..5a17b16a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ repository = "https://github.com/knative-extensions/func-python" [tool.poetry.dependencies] python = "^3.12" hypercorn = "^0.17.3" +cloudevents = "^1.11.0" [tool.pytest.ini_options] pythonpath = ["src"] diff --git a/src/func_python/cloudevent.py b/src/func_python/cloudevent.py new file mode 100644 index 00000000..a8ce3071 --- /dev/null +++ b/src/func_python/cloudevent.py @@ -0,0 +1,278 @@ +import asyncio +import logging +import os +import signal +import hypercorn.config +import hypercorn.asyncio + +from cloudevents.http import from_http, CloudEvent +from cloudevents.conversion import to_structured, to_binary + +DEFAULT_LOG_LEVEL = logging.INFO +DEFAULT_LISTEN_ADDRESS = "127.0.0.1:8080" + +logging.basicConfig(level=DEFAULT_LOG_LEVEL) + + +def serve(f): + """serve a function f by wrapping it in an ASGI web application + and starting. The function can be either a constructor for a functon + instance (named "new") or a simple ASGI handler function (named "handle"). + """ + logging.debug("func runtime creating function instance") + + if f.__name__ == 'new': + return ASGIApplication(f()).serve() + elif f.__name__ == 'handle': + try: + return ASGIApplication(DefaultFunction(f)).serve() + except Exception as e: + logging.error(f"Server failed to start: {e}") + raise + else: + raise ValueError("function must be either be a constructor 'new' or a " + "handler function 'handle'.") + + +class DefaultFunction: + """DefaultFunction is used when the provided functon is not a constructor + for a Function instance, but rather a simple handler function""" + + def __init__(self, handler): + self.handle = handler + + async def handle(self, scope, receive, send): + # delegate to the handler implementation provided during construction. + await self.handle(scope, receive, send) + + +class ASGIApplication(): + def __init__(self, f): + self.f = f + self.stop_event = asyncio.Event() + if hasattr(self.f, "handle") is not True: + raise AttributeError("Function must implement a 'handle' method.") + + # Inform the user via logs that defaults will be used for health + # endpoints if no matchin methods were provided. + if hasattr(self.f, "alive") is not True: + logging.info( + "function does not implement 'alive'. Using default " + "implementation for liveness checks." + ) + if hasattr(self.f, "ready") is not True: + logging.info( + "function does not implement 'ready'. Using default " + "implementation for readiness checks." + ) + + def serve(self): + """serve serving this ASGIhandler, delegating implementation of + methods as necessary to the wrapped Function instance""" + cfg = hypercorn.config.Config() + cfg.bind = [os.getenv('LISTEN_ADDRESS', DEFAULT_LISTEN_ADDRESS)] + + logging.info(f"function starting on {cfg.bind}") + return asyncio.run(self._serve(cfg)) + + async def _serve(self, cfg): + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, self._handle_signal) + loop.add_signal_handler(signal.SIGTERM, self._handle_signal) + + await hypercorn.asyncio.serve(self, cfg) + + def _handle_signal(self): + logging.info("Signal received: initiating shutdown") + self.stop_event.set() + + async def on_start(self): + """on_start handles the ASGI server start event, delegating control + to the internal Function instance if it has a "start" method.""" + if hasattr(self.f, "start"): + self.f.start(os.environ.copy()) + else: + logging.debug("function does not implement 'start'. Skipping.") + + async def on_stop(self): + if hasattr(self.f, "stop"): + self.f.stop() + else: + logging.debug("function does not implement 'stop'. Skipping.") + self.stop_event.set() + + async def __call__(self, scope, receive, send): + if scope['type'] == 'lifespan': + while True: + message = await receive() + if message['type'] == 'lifespan.startup': + await self.on_start() + await send({'type': 'lifespan.startup.complete'}) + elif message['type'] == 'lifespan.shutdown': + await self.on_stop() + await send({'type': 'lifespan.shutdown.complete'}) + return + else: + break + + # Assert request is HTTP + if scope["type"] != "http": + await send_exception(send, 400, + "Functions currently only support ASGI/HTTP " + f"connections. Got {scope['type']}" + ) + return + + # Route request + try: + if scope['path'] == '/health/liveness': + await self.handle_liveness(scope, receive, send) + elif scope['path'] == '/health/readiness': + await self.handle_readiness(scope, receive, send) + else: + # CloudEvents Middleware + # Currently the http and cloudevents middleware implementations + # are identical with the exception of this section which + # reads the request as a CloudEvent and adds it to the scope, + # and sends a response CloudEvent if returned. + # Should this implementation prove adequate, we can combine + # into a single middleware with a swithch to enable this + # interstitial encode/decode, and thus avoid the approx. 200 + # lines of shared server boilerplate. + # + # Decode the event and make it available in the scope + scope["event"] = await decode_event(scope, receive) + # Wrap the sender in a CloudEventSender + send = CloudEventSender(send) + # Delegate processing to user's Function + await self.f.handle(scope, receive, send) + except Exception as e: + await send_exception_cloudevent(send, 500, f"Error: {e}") + + async def handle_liveness(self, scope, receive, send): + alive = True + message = "OK" + if hasattr(self.f, "alive"): + result = self.f.alive() + # The message return is optional + if isinstance(result, tuple): + alive, message = result + else: + alive = result + + if alive: + await send({'type': 'http.response.start', 'status': 200, + 'headers': [[b'content-type', b'text/plain']]}) + else: + await send({'type': 'http.response.start', 'status': 500, + 'headers': [[b'content-type', b'text/plain']]}) + + await send({'type': 'http.response.body', + 'body': f'{message}'.encode('utf-8'), + }) + + async def handle_readiness(self, scope, receive, send): + ready = True + message = "OK" + if hasattr(self.f, "ready"): + result = self.f.ready() + # The message return is optional + if isinstance(result, tuple): + ready, message = result + else: + ready = result + + if ready: + await send({'type': 'http.response.start', 'status': 200, + 'headers': [[b'content-type', b'text/plain']]}) + else: + await send({'type': 'http.response.start', 'status': 500, + 'headers': [[b'content-type', b'text/plain']]}) + + await send({'type': 'http.response.body', + 'body': f'{message}'.encode('utf-8'), + }) + + +async def decode_event(scope, receive): + body = await receive_body(receive) + headers = { + k.decode("utf-8").lower(): v.decode("utf-8") + for k, v in scope.get("headers", []) + } + return from_http(headers, body) + + +async def receive_body(receive): + """For CloudEvents: receive the body and return it as bytes""" + body = b"" + more_body = True + while more_body: + message = await receive() + body += message.get("body", b"") + more_body = message.get("more_body", False) + return body + + +async def send_exception(send, code, message): + await send({ + 'type': 'http.response.start', 'status': code, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', 'body': message, + }) + + +async def send_exception_cloudevent(send, status, message): + attributes = { + "type": "dev.functions.error", + "source": "/cloudevent/error", + } + data = {"message": message} + + await send.structured(CloudEvent(attributes, data), status) + + +class CloudEventSender: + """A sender which supports CloudEvents""" + + def __init__(self, send): + self._send = send + + async def __call__(self, event, status: int = 200): + """default send assumes a strcutred event""" + await self.structured(event, status) + + async def structured(self, event, status=200): + """send as a structured cloudevent""" + headers, body = to_structured(event) + await self._send_encoded_cloudevent(headers, body, status) + + async def binary(self, event, status=200): + """send as a binary cloudevent""" + headers, body = to_binary(event) + await self._send_encoded_cloudevent(headers, body, status) + + async def http(self, message): + """Send a raw http response, bypassing the automatic cloudevent + encoding. Use this for more granular control of the response.""" + self._send(message) + + async def _send_encoded_cloudevent(self, headers, body, status=200): + """Send the given cloudevent headers and body.""" + headers = [ + (k.encode(), v.encode()) + for k, v in headers.items() + ] + [(b"content-length", str(len(body)).encode())] + + await self._send({ + "type": "http.response.start", + "status": status, + "headers": headers + }) + + await self._send({ + "type": "http.response.body", + "body": body, + }) diff --git a/src/func_python/http.py b/src/func_python/http.py index 9394b2b9..00cad4a2 100644 --- a/src/func_python/http.py +++ b/src/func_python/http.py @@ -49,7 +49,7 @@ def __init__(self, f): self.f = f self.stop_event = asyncio.Event() if hasattr(self.f, "handle") is not True: - raise AttributeError( "Function must implement a 'handle' method.") + raise AttributeError("Function must implement a 'handle' method.") # Inform the user via logs that defaults will be used for health # endpoints if no matchin methods were provided. diff --git a/tests/test_cloudevent.py b/tests/test_cloudevent.py new file mode 100644 index 00000000..2549841c --- /dev/null +++ b/tests/test_cloudevent.py @@ -0,0 +1,226 @@ +import httpx +import json +import logging +import os +import signal +import threading +import time +import uuid +import pytest +from func_python.cloudevent import serve +from cloudevents.conversion import to_structured +from cloudevents.http import CloudEvent + +logging.basicConfig(level=logging.INFO) + +# Set a dynamic test URL using an environment variable +os.environ["LISTEN_ADDRESS"] = os.getenv("LISTEN_ADDRESS", "127.0.0.1:8081") + +# Retrieve the LISTEN_ADDRESS for use in the tests +LISTEN_ADDRESS = os.getenv("LISTEN_ADDRESS") + + +def test_static(): + """ + A basic test which ensures that serving a static "handle" method + succeeds without failure. + """ + + # Function + # An example minimal "static" user function which will be + # exposed on the network as an ASGI service by the middleware. + async def handle(scope, receive, send): + # Ensure that the scope contains the CloudEvent + if "event" not in scope: + await send(CloudEvent({}, {"message": "no event in scope"}), 500) + + attributes = { + "type": "com.example.teststatic", + "source": "https://example.com/event-producer", + } + content = {"message": "OK test_static"} + + # The default send encodes the cloudevent using to_structured. + # use send.binary to send it binary encoded, or send.http to use + # the raw ASGI response object without CloudEvent middleware. + await send(CloudEvent(attributes, content)) + + # Test + # Run async, this attempts to contact the running function and confirm the + # handle method was successfully served. + test_complete = threading.Event() + test_results = {"success": False, "error": None} + + def test(): + try: + wait_for_function() # to become available + + # Send a CloudEvent to the Function + attributes = { + "type": "com.example.test-static", + "source": "https://example.com/event-producer", + } + data = {"message": "test_static"} + headers, content = to_structured(CloudEvent(attributes, data)) + response = httpx.post( + f"http://{LISTEN_ADDRESS}", + headers=headers, + content=content + ) + + # Assertions + assert response.status_code == 200 + response_event = json.loads(response.text) + assert response_event["data"]["message"] == "OK test_static" + test_results["success"] = True + except Exception as e: + test_results["error"] = str(e) + finally: + test_complete.set() # signal test completion + os.kill(os.getpid(), signal.SIGINT) # gracefully term Function + + # Start the test loop asynchronously + test_thread = threading.Thread(target=test) + test_thread.daemon = True # exit when test does + test_thread.start() + + # Serve the handle function + # Note this will fail if not in the main thread due to "set_wakeup_fd" + serve(handle) + + if not test_complete.wait(10): + pytest.fail("Test timed out") + + if not test_results["success"]: + pytest.fail(test_results["error"] or "Test failed") + + +def test_instanced(): + """ + ensures that a user function developed using the default "instanced" + style is served by the middleware + """ + + # User Function + # An example standard "instanced" function (user's Function) which is + # exposed on the network as an ASGI service by the middleware. + class MyFunction: + async def handle(self, scope, receive, send): + # Check if this is a CloudEvent + if "event" not in scope: + await send(CloudEvent({},{"message": "no event in scope"}), 500) + + attributes = { + "type": "com.example.testinstanced", + "source": "https://example.com/event-producer", + } + content = {"message": "OK test_instanced"} + + # The default send encodes the cloudevent using to_structured. + # use send.binary to send it binary encoded, or send.http to use + # the raw ASGI response object without CloudEvent middleware. + await send(CloudEvent(attributes, content)) + + def new(): + return MyFunction() + + # Tests + # Attempts to contact the running function and confirm the user's function + # was instantiated and served. + test_complete = threading.Event() + test_results = {"success": False, "error": None} + + def test(): + try: + wait_for_function() # to become available + + # Send a CloudEvent to the Function + attributes = { + "type": "com.example.test-static", + "source": "https://example.com/event-producer", + } + data = {"message": "test_instanced"} + headers, content = to_structured(CloudEvent(attributes, data)) + response = httpx.post( + f"http://{LISTEN_ADDRESS}", + headers=headers, + content=content + ) + + # Assertions + assert response.status_code == 200 + response_event = json.loads(response.text) + assert response_event["data"]["message"] == "OK test_instanced" + test_results["success"] = True + except Exception as e: + test_results["error"] = str(e) + finally: + test_complete.set() # signal test completion + os.kill(os.getpid(), signal.SIGINT) # gracefully term Function + + # Start the test loop asynchronously + test_thread = threading.Thread(target=test) + test_thread.daemon = True # exit when test does + test_thread.start() + + # Serve the Function + serve(new) + + if not test_complete.wait(10): + pytest.fail("Test timed out") + + if not test_results["success"]: + pytest.fail(test_results["error"] or "Test failed") + + +def test_signal_handling(): + """ + Tests that the server gracefully shuts down when receiving a SIGINT signal. + """ + # Example minimal ASGI app that handles CloudEvents + async def handle(scope, receive, send): + # For the signal handling test, we just need the server to start + # and then shut down gracefully, so we keep the handler simple + await send({ + 'type': 'http.response.start', + 'status': 200, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': b'Signal Handling OK', + }) + + # Function to send a SIGINT after a delay + def send_signal(): + time.sleep(2) # Allow server to start + os.kill(os.getpid(), signal.SIGINT) + + # Start signal sender in a separate thread + signal_thread = threading.Thread(target=send_signal) + signal_thread.start() + + # Serve the function + try: + serve(handle) + except KeyboardInterrupt: + logging.info("SIGINT received and handled gracefully.") + + signal_thread.join(timeout=5) + + +class FunctionNotAvailableError(Exception): + """ Raised when a Function is not available """ + + +def wait_for_function(): + max_retries = 20 + for i in range(max_retries): + time.sleep(0.5) + try: + httpx.get(f"http://{LISTEN_ADDRESS}/health/liveness") + break + except httpx.ConnectError: + logging.info(f"Retrying ({i+1}/{max_retries})...") + if i >= max_retries: + raise FunctionNotAvailableError(f"Function at {LISTEN_ADDRESS} did not start after {max_retries} attempts") diff --git a/tests/test_http.py b/tests/test_http.py index 45a39914..298590de 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -73,6 +73,7 @@ def test(): test_thread.join(timeout=5) + def test_instanced(): """ ensures that a user function developed using the default "instanced" @@ -134,6 +135,7 @@ def test(): test_thread.join(timeout=5) + def test_signal_handling(): """ Tests that the server gracefully shuts down when receiving a SIGINT signal.