Skip to content

Commit 1123eea

Browse files
authored
feat: add execution_id support for async stack (#377)
* feat: add execution_id support for async stack - Add contextvars support to execution_id.py for async-safe context storage - Create AsgiMiddleware class to inject execution_id into ASGI requests - Add set_execution_context_async decorator for both sync and async functions - Update LoggingHandlerAddExecutionId to support both Flask g and contextvars - Integrate execution_id support in aio/__init__.py with proper exception handling - Add comprehensive async tests matching sync test functionality - Follow Starlette best practices for exception handling The implementation enables automatic execution_id injection and logging for async functions when LOG_EXECUTION_ID=true, matching the existing sync stack behavior. * refactor: move exception logging to crash handler for cleaner code - Remove try/catch blocks from wrapper functions - Centralize exception logging in _crash_handler - Extract execution_id directly from request headers in crash handler - Temporarily set context when logging exceptions to ensure execution_id is included - This approach is cleaner and more similar to Flask's centralized exception handling * refactor: improve code organization based on feedback - Move imports to top of file instead of inside functions - Extract common header parsing logic into _extract_context_from_headers helper - Reduce code duplication between sync and async decorators - Add comment explaining why crash handler needs to extract context from headers - This addresses the context reset issue where decorators clean up before exception handlers run * fix: preserve execution context for exception handlers - Don't reset context on exception, only on successful completion - This allows exception handlers to access execution_id naturally - Simplify crash handler since context is now available - Rely on Python's automatic contextvar cleanup when task completes - Each request runs in its own task, so no risk of context leakage This is more correct and follows the principle that context should be available throughout the entire request lifecycle, including error handling. * style: apply black and isort formatting - Format code with black for consistent style - Sort imports with isort for better organization - All linting checks now pass * refactor: clean up async tests and remove redundant comments * chore: remove uv.lock from version control * style: fix black formatting * fix: skip async execution_id tests on Python 3.7 * refactor: reuse _enable_execution_id_logging from main module * chore: more cleanup. * test: remove unnecessary pragma no cover for sync_wrapper * test: improve coverage by removing unnecessary pragma no cover annotations * style: fix black formatting * style: fix isort import ordering * test: add back pragma no cover for genuinely hard-to-test edge cases * refactor: simplify async decorator by removing dead code branch * fix: exclude async-specific code from py37 coverage The AsgiMiddleware class and set_execution_context_async function in execution_id.py require Python 3.8+ due to async dependencies. These are now excluded from coverage calculations in Python 3.7 environments. * fix: improve async execution ID context propagation using contextvars - Use contextvars.copy_context() to properly propagate execution context in async functions - Implement AsyncExecutionIdHandler to handle JSON logging with execution_id - Redirect logging output from stderr to stdout for consistency - Add build dependency to dev dependencies - Update tests to reflect new logging output location * feat: Add execution ID logging for async functions Refactors the async logging implementation to align with the sync version, ensuring consistent execution ID logging across both stacks. * chore: clean up impl. * refactor: define custom exception handling middleware to avoid duplicate log of traceback. * style: run black * chore: clean up code a little more. * fix: propagate context in ce fns. * style: more nits. * chore: remove unncessary debug flag. * fix: respond to PR comments * fix: respond to more PR comments
1 parent 268acf1 commit 1123eea

File tree

9 files changed

+665
-33
lines changed

9 files changed

+665
-33
lines changed

.coveragerc-py37

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,8 @@ exclude_lines =
1818
# Don't complain about async-specific imports and code
1919
from functions_framework.aio import
2020
from functions_framework._http.asgi import
21-
from functions_framework._http.gunicorn import UvicornApplication
21+
from functions_framework._http.gunicorn import UvicornApplication
22+
23+
# Exclude async-specific classes and functions in execution_id.py
24+
class AsgiMiddleware:
25+
def set_execution_context_async

conftest.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,12 @@ def pytest_ignore_collect(collection_path, config):
5050
if sys.version_info >= (3, 8):
5151
return None
5252

53-
# Skip test_aio.py and test_asgi.py entirely on Python 3.7
54-
if collection_path.name in ["test_aio.py", "test_asgi.py"]:
53+
# Skip test_aio.py, test_asgi.py, and test_execution_id_async.py entirely on Python 3.7
54+
if collection_path.name in [
55+
"test_aio.py",
56+
"test_asgi.py",
57+
"test_execution_id_async.py",
58+
]:
5559
return True
5660

5761
return None

pyproject.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,13 @@ functions_framework = ["py.typed"]
6161

6262
[tool.setuptools.package-dir]
6363
"" = "src"
64+
65+
[dependency-groups]
66+
dev = [
67+
"black>=23.3.0",
68+
"build>=1.1.1",
69+
"isort>=5.11.5",
70+
"pretend>=1.0.9",
71+
"pytest>=7.4.4",
72+
"pytest-asyncio>=0.21.2",
73+
]

src/functions_framework/aio/__init__.py

Lines changed: 97 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,24 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import contextvars
1617
import functools
1718
import inspect
19+
import logging
20+
import logging.config
1821
import os
22+
import traceback
1923

2024
from typing import Any, Awaitable, Callable, Dict, Tuple, Union
2125

2226
from cloudevents.http import from_http
2327
from cloudevents.http.event import CloudEvent
2428

25-
from functions_framework import _function_registry
29+
from functions_framework import (
30+
_enable_execution_id_logging,
31+
_function_registry,
32+
execution_id,
33+
)
2634
from functions_framework.exceptions import (
2735
FunctionsFrameworkException,
2836
MissingSourceException,
@@ -31,6 +39,7 @@
3139
try:
3240
from starlette.applications import Starlette
3341
from starlette.exceptions import HTTPException
42+
from starlette.middleware import Middleware
3443
from starlette.requests import Request
3544
from starlette.responses import JSONResponse, Response
3645
from starlette.routing import Route
@@ -96,29 +105,27 @@ def wrapper(*args, **kwargs):
96105
return wrapper
97106

98107

99-
async def _crash_handler(request, exc):
100-
headers = {_FUNCTION_STATUS_HEADER_FIELD: _CRASH}
101-
return Response(f"Internal Server Error: {exc}", status_code=500, headers=headers)
102-
103-
104-
def _http_func_wrapper(function, is_async):
108+
def _http_func_wrapper(function, is_async, enable_id_logging=False):
109+
@execution_id.set_execution_context_async(enable_id_logging)
105110
@functools.wraps(function)
106111
async def handler(request):
107112
if is_async:
108113
result = await function(request)
109114
else:
110115
# TODO: Use asyncio.to_thread when we drop Python 3.8 support
111-
# Python 3.8 compatible version of asyncio.to_thread
112116
loop = asyncio.get_event_loop()
113-
result = await loop.run_in_executor(None, function, request)
117+
ctx = contextvars.copy_context()
118+
result = await loop.run_in_executor(None, ctx.run, function, request)
114119
if isinstance(result, str):
115120
return Response(result)
116121
elif isinstance(result, dict):
117122
return JSONResponse(result)
118123
elif isinstance(result, tuple) and len(result) == 2:
119-
# Support Flask-style tuple response
120124
content, status_code = result
121-
return Response(content, status_code=status_code)
125+
if isinstance(content, dict):
126+
return JSONResponse(content, status_code=status_code)
127+
else:
128+
return Response(content, status_code=status_code)
122129
elif result is None:
123130
raise HTTPException(status_code=500, detail="No response returned")
124131
else:
@@ -127,7 +134,8 @@ async def handler(request):
127134
return handler
128135

129136

130-
def _cloudevent_func_wrapper(function, is_async):
137+
def _cloudevent_func_wrapper(function, is_async, enable_id_logging=False):
138+
@execution_id.set_execution_context_async(enable_id_logging)
131139
@functools.wraps(function)
132140
async def handler(request):
133141
data = await request.body()
@@ -142,9 +150,9 @@ async def handler(request):
142150
await function(event)
143151
else:
144152
# TODO: Use asyncio.to_thread when we drop Python 3.8 support
145-
# Python 3.8 compatible version of asyncio.to_thread
146153
loop = asyncio.get_event_loop()
147-
await loop.run_in_executor(None, function, event)
154+
ctx = contextvars.copy_context()
155+
await loop.run_in_executor(None, ctx.run, function, event)
148156
return Response("OK")
149157

150158
return handler
@@ -154,6 +162,64 @@ async def _handle_not_found(request: Request):
154162
raise HTTPException(status_code=404, detail="Not Found")
155163

156164

165+
def _configure_app_execution_id_logging():
166+
logging.config.dictConfig(
167+
{
168+
"version": 1,
169+
"handlers": {
170+
"asgi": {
171+
"class": "logging.StreamHandler",
172+
"stream": "ext://functions_framework.execution_id.logging_stream",
173+
},
174+
},
175+
"root": {"level": "INFO", "handlers": ["asgi"]},
176+
}
177+
)
178+
179+
180+
class ExceptionHandlerMiddleware:
181+
def __init__(self, app):
182+
self.app = app
183+
184+
async def __call__(self, scope, receive, send):
185+
if scope["type"] != "http": # pragma: no cover
186+
await self.app(scope, receive, send)
187+
return
188+
189+
try:
190+
await self.app(scope, receive, send)
191+
except Exception as exc:
192+
logger = logging.getLogger()
193+
tb_lines = traceback.format_exception(type(exc), exc, exc.__traceback__)
194+
tb_text = "".join(tb_lines)
195+
196+
path = scope.get("path", "/")
197+
method = scope.get("method", "GET")
198+
error_msg = f"Exception on {path} [{method}]\n{tb_text}".rstrip()
199+
200+
logger.error(error_msg)
201+
202+
headers = [
203+
[b"content-type", b"text/plain"],
204+
[_FUNCTION_STATUS_HEADER_FIELD.encode(), _CRASH.encode()],
205+
]
206+
207+
await send(
208+
{
209+
"type": "http.response.start",
210+
"status": 500,
211+
"headers": headers,
212+
}
213+
)
214+
await send(
215+
{
216+
"type": "http.response.body",
217+
"body": b"Internal Server Error",
218+
}
219+
)
220+
# Don't re-raise to prevent starlette from printing traceback again
221+
222+
157223
def create_asgi_app(target=None, source=None, signature_type=None):
158224
"""Create an ASGI application for the function.
159225
@@ -175,14 +241,19 @@ def create_asgi_app(target=None, source=None, signature_type=None):
175241
)
176242

177243
source_module, spec = _function_registry.load_function_module(source)
244+
245+
enable_id_logging = _enable_execution_id_logging()
246+
if enable_id_logging:
247+
_configure_app_execution_id_logging()
248+
178249
spec.loader.exec_module(source_module)
179250
function = _function_registry.get_user_function(source, source_module, target)
180251
signature_type = _function_registry.get_func_signature_type(target, signature_type)
181252

182253
is_async = inspect.iscoroutinefunction(function)
183254
routes = []
184255
if signature_type == _function_registry.HTTP_SIGNATURE_TYPE:
185-
http_handler = _http_func_wrapper(function, is_async)
256+
http_handler = _http_func_wrapper(function, is_async, enable_id_logging)
186257
routes.append(
187258
Route(
188259
"/",
@@ -202,7 +273,9 @@ def create_asgi_app(target=None, source=None, signature_type=None):
202273
)
203274
)
204275
elif signature_type == _function_registry.CLOUDEVENT_SIGNATURE_TYPE:
205-
cloudevent_handler = _cloudevent_func_wrapper(function, is_async)
276+
cloudevent_handler = _cloudevent_func_wrapper(
277+
function, is_async, enable_id_logging
278+
)
206279
routes.append(
207280
Route("/{path:path}", endpoint=cloudevent_handler, methods=["POST"])
208281
)
@@ -221,10 +294,14 @@ def create_asgi_app(target=None, source=None, signature_type=None):
221294
f"Unsupported signature type for ASGI server: {signature_type}"
222295
)
223296

224-
exception_handlers = {
225-
500: _crash_handler,
226-
}
227-
app = Starlette(routes=routes, exception_handlers=exception_handlers)
297+
app = Starlette(
298+
routes=routes,
299+
middleware=[
300+
Middleware(ExceptionHandlerMiddleware),
301+
Middleware(execution_id.AsgiMiddleware),
302+
],
303+
)
304+
228305
return app
229306

230307

0 commit comments

Comments
 (0)