From 7b71049211e3341fa55d228ad5527b38814f2874 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Tue, 7 Jan 2025 07:53:57 +0100 Subject: [PATCH] WIP: Adding some signal handling (#27) Adding some signal handling Signed-off-by: Matthias Wessendorf --- src/func_python/http.py | 20 ++++++++++++++++---- tests/test_http.py | 38 +++++++++++++++++++++++++++++++++++--- 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/src/func_python/http.py b/src/func_python/http.py index 4b01ea89..767611e3 100644 --- a/src/func_python/http.py +++ b/src/func_python/http.py @@ -2,6 +2,7 @@ import asyncio import logging import os +import signal import hypercorn.config import hypercorn.asyncio @@ -10,7 +11,6 @@ logging.basicConfig(level=DEFAULT_LOG_LEVEL) - def serve(f): """serve a function f by wrapping it in an ASGI web application and starting. The function can be either a constructor for a functon @@ -46,6 +46,7 @@ async def __call__(self, scope, receive, send): class ASGIApplication(): def __init__(self, f): self.f = f + self.stop_event = asyncio.Event() # Inform the user via logs that defaults will be used for health # endpoints if no matchin methods were provided. if hasattr(self.f, "alive") is not True: @@ -67,7 +68,18 @@ def serve(self): cfg.bind = [os.getenv('LISTEN_ADDRESS', DEFAULT_LISTEN_ADDRESS)] logging.debug(f"function starting on {cfg.bind}") - return asyncio.run(hypercorn.asyncio.serve(self, cfg)) + return asyncio.run(self._serve(cfg)) + + async def _serve(self, cfg): + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, self._handle_signal) + loop.add_signal_handler(signal.SIGTERM, self._handle_signal) + + await hypercorn.asyncio.serve(self, cfg) + + def _handle_signal(self): + logging.info("Signal received: initiating shutdown") + self.stop_event.set() async def on_start(self): """on_start handles the ASGI server start event, delegating control @@ -82,8 +94,8 @@ async def on_stop(self): self.f.stop() else: logging.info("function does not implement 'stop'. Skipping.") + self.stop_event.set() - # Register ASGIFunctoin as a callable ASGI Function async def __call__(self, scope, receive, send): if scope['type'] == 'lifespan': while True: @@ -101,7 +113,7 @@ async def __call__(self, scope, receive, send): # Assert request is HTTP if scope["type"] != "http": await send_exception(send, 400, - "Functions currenly only support ASGI/HTTP " + "Functions currently only support ASGI/HTTP " f"connections. Got {scope['type']}" ) return diff --git a/tests/test_http.py b/tests/test_http.py index 94a2b57c..11031353 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -14,7 +14,7 @@ LISTEN_ADDRESS = os.getenv("LISTEN_ADDRESS") def test_static(): - """ + """ ensures that a user function developed using the default "static" style (method signature) is served by the middleware. """ @@ -71,9 +71,8 @@ def test(): test_thread.join(timeout=5) - def test_instanced(): - """ + """ ensures that a user function developed using the default "instanced" style is served by the middleware """ @@ -132,3 +131,36 @@ def test(): logging.info("signal received") test_thread.join(timeout=5) + +def test_signal_handling(): + """ + Tests that the server gracefully shuts down when receiving a SIGINT signal. + """ + # Example minimal ASGI app + async def handle(scope, receive, send): + await send({ + 'type': 'http.response.start', + 'status': 200, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': b'Signal Handling OK', + }) + + # Function to send a SIGINT after a delay + def send_signal(): + time.sleep(2) # Allow server to start + os.kill(os.getpid(), signal.SIGINT) + + # Start signal sender in a separate thread + signal_thread = threading.Thread(target=send_signal) + signal_thread.start() + + # Serve the function + try: + serve(handle) + except KeyboardInterrupt: + logging.info("SIGINT received and handled gracefully.") + + signal_thread.join(timeout=5)