Open
Description
Describe the bug: Hi, I'm trying to integrate my app with ElasticAPM and getting strange behaviour around langgraph functionality when ElasticAPM middleware is added (TypeError: AsyncCursorProxy.execute() got an unexpected keyword argument 'binary'
).
To Reproduce
- You need to have
AsyncConnectionPool
instrumented like this:
@asynccontextmanager
async def lifespan(app: FastAPI):
async with AsyncConnectionPool(
conninfo=settings.db.connection_string,
max_size=settings.db.connection_pool_size,
timeout=settings.db.connection_timeout,
kwargs={
"autocommit": True,
"row_factory": "dict_row",
"prepare_threshold": 0
}) as pool:
await pool.wait()
app.async_connection_pool = pool
yield
await app.async_connection_pool.close()
- You need to have langgraph checkpointer set like so:
async def get_agent_memory(request: Request):
async with request.app.async_connection_pool.connection() as connection:
checkpointer = AsyncPostgresSaver(connection)
await checkpointer.setup()
yield checkpointer
- You need to have
StateGraph
instrumented and compiled like the following (most of the custom code is omitted):
...
graph = StateGraph(AgentState)
...
self.agent = graph.compile(checkpointer=checkpointer)
- Next, when you need to instrument
FastAPI
app withElasticAPM
middleware like so:
...
app = FastAPI(lifespan=lifespan)
app.add_middleware(ElasticAPM, client=apm_client)
...
- Finally, you need to call the agent. In my case I do have the following calls in my code (most of the custom code is omitted):
...
await self.agent.aget_state(config)
...
await self.agent.ainvoke(Command(resume=_input), config)
...
await self.agent.ainvoke(_input, config)
...
All of this leads to the TypeError: AsyncCursorProxy.execute() got an unexpected keyword argument 'binary'
(see traceback below).
Notes:
- If the
app.add_middleware(ElasticAPM, client=apm_client)
is commented out, then everything works just fine. - Also, if the checkpointer is simple
MemorySaver
, then theElasticAPM
middleware does not break the flow so it looks like the combination ofFastAPI
,langgraph
,AsyncPostgresSaver
andElasticAPM
middleware makes the deal.
Environment (please complete the following information)
- OS: Linux, MacOS
- Python version: 3.13.3
- Framework and version: FastAPI 0.115.8
- APM Server version: 8.17.5
- Agent version: 6.23.0
Additional context
Traceback
Traceback (most recent call last):
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
self.scope, self.receive, self.send
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/fastapi/applications.py", line 1054, in __call__
await super().__call__(scope, receive, send)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/applications.py", line 112, in __call__
await self.middleware_stack(scope, receive, send)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/elasticapm/instrumentation/packages/asyncio/starlette.py", line 48, in call
return await wrapped(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/middleware/errors.py", line 187, in __call__
raise exc
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/middleware/errors.py", line 165, in __call__
await self.app(scope, receive, _send)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/elasticapm/contrib/starlette/__init__.py", line 199, in __call__
await self.app(scope, _request_receive or receive, wrapped_send)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
raise exc
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/routing.py", line 714, in __call__
await self.middleware_stack(scope, receive, send)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/routing.py", line 734, in app
await route.handle(scope, receive, send)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/routing.py", line 288, in handle
await self.app(scope, receive, send)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/routing.py", line 76, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
raise exc
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/starlette/routing.py", line 73, in app
response = await f(request)
^^^^^^^^^^^^^^^^
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/fastapi/routing.py", line 301, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<3 lines>...
)
^
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/fastapi/routing.py", line 212, in run_endpoint_function
return await dependant.call(**values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/user_name/src/my_app/src/api/routes/routes.py", line 27, in message_handler
response = await graph.send_message(...)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/user_name/src/my_app/src/services/agents/graph.py", line 24, in send_message
return await self.my_agent.invoke(...)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/user_name/src/my_app/src/services/agents/my_agent.py", line 75, in invoke
state = await self.agent.aget_state(config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/langgraph/pregel/__init__.py", line 1019, in aget_state
saved = await checkpointer.aget_tuple(config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/user_name/src/my_app/.venv/lib/python3.13/site-packages/langgraph/checkpoint/postgres/aio.py", line 187, in aget_tuple
await cur.execute(
~~~~~~~~~~~^
self.SELECT_SQL + where,
^^^^^^^^^^^^^^^^^^^^^^^^
args,
^^^^^
binary=True,
^^^^^^^^^^^^
)
^
-
Agent config options
Click to expand
apm_client = make_apm_client({ 'SERVICE_NAME': 'my-app', 'SERVER_URL': '<server-url>', 'GLOBAL_LABELS': 'project=my-project,service_type=app', 'ENVIRONMENT': 'dev' })
-
requirements.txt
(in my case Poetry'spyproject.toml
):Click to expand
... dependencies = [ "langgraph (>=0.2.74,<0.3.0)", "langchain-core (>=0.3.37,<0.4.0)", "langchain-openai (>=0.3.6,<0.4.0)", "langgraph-checkpoint-postgres (>=2.0.15,<3.0.0)", "dacite (>=1.9.2,<2.0.0)", "psycopg-pool (>=3.2.5,<4.0.0)", "fastapi (>=0.115.8,<0.116.0)", "pydantic-settings (>=2.8.0,<3.0.0)", "uvicorn (>=0.34.0,<0.35.0)", "pyjwt (==2.10.1)", "postgres (>=4.0,<5.0)", "psycopg[binary,pool] (>=3.2.5,<4.0.0)", "grandalf (>=0.8,<0.9)", "pyhumps (>=3.8.0,<4.0.0)", "google-cloud-logging (>=3.12.0,<4.0.0)", "sqlmodel (>=0.0.24,<0.0.25)", "sqlalchemy[asyncio] (>=2.0.40,<3.0.0)", "asyncpg (>=0.30.0,<0.31.0)", "yattag (>=1.16.1,<2.0.0)", "elastic-apm (>=6.23.0,<7.0.0)", ] ...