Skip to content

Conversation

eulersIDcrisis
Copy link
Contributor

This PR adds the ability for streaming_callback of AsyncHTTPClient to be a coroutine. In addition to the small change, this adds a nominal test to verify that things work as expected, at least as best as the streaming_callback approach can permit.

This PR is intended to address a few separate issues listed below:

@eulersIDcrisis eulersIDcrisis marked this pull request as ready for review March 26, 2025 18:22
@eulersIDcrisis
Copy link
Contributor Author

@bdarnell , if it is any consolation, I've run tox which passes for me (including the docs and linter checks).

@eulersIDcrisis
Copy link
Contributor Author

I guess one other consideration for this PR; pycurl might not support async streaming callbacks, but perhaps an assertion can/should be added for that case?

@bdarnell
Copy link
Member

bdarnell commented Aug 5, 2025

Thank you for your patience with this; it originally came in as I was trying to finalize some things for the 6.5 release and I'm just now coming back to it. I like how simply it fits in with the existing coroutine support of data_received.

It is possible but more difficult to support streaming callbacks on curl_httpclient. I'd be OK with just returning an error if streaming_callback is awaitable in that case.

Do we have any precedent for passing an asynchronous function as a callback? It feels a little different from Tornado's current interface designs. If I was designing this from scratch I'd probably have something like

resp = await client.fetch(url)
# resp has response code, headers, etc
async for chunk in resp.body:
    ...

But I think that's going too far away from the way things work today. An intermediate step might be to populate a Queue which the caller could read from. But that still feels kind of clunky to use and I'm not sure it's any better than what you have here.

The test isn't telling us much because there's no way to tell whether or when the callback resumes after waiting for gen.moment. I've tested this in the past with elaborate sidechannels between clients and servers to ensure that the second chunk of the response isn't generated until the first one has been received. I'm open to simpler things here (the more elaborate things are already tested at the HTTPSerer level), but I'm not sure what exactly that would look like.

@eulersIDcrisis
Copy link
Contributor Author

eulersIDcrisis commented Sep 9, 2025

It is possible but more difficult to support streaming callbacks on curl_httpclient. I'd be OK with just returning an error if streaming_callback is awaitable in that case.

I can look into this or otherwise raise an error. I just did not want to add complexity where it wasn't needed.

Do we have any precedent for passing an asynchronous function as a callback? It feels a little different from Tornado's current interface designs. If I was designing this from scratch I'd probably have something like

resp = await client.fetch(url)
# resp has response code, headers, etc
async for chunk in resp.body:
    ...

Yes, I agree. This is the interface that httpx provides. This type of interface also becomes possible with a small adapter function too without too much work. Loose coding example:

async def request_streamer(url) -> AsyncGenerator[bytes, None, None]:
    client = AsyncHTTPClient()  # Initialized however.
    # Should set a maximum size to the queue to nominally bound it.
    body = asyncio.Queue(maxsize=1)
    request_task = client.fetch(url, streaming_callback=body.put)
    data_task = asyncio.create_task(body.get())
    pending = [request_task, data_task]
    while True:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            if fut is request_task:
                # Check if both futures are done, and yield the result here first.
                if data_task.done():
                    yield data_task.result()
                while not body.empty():
                    yield body.get_nowait()
                # Let any exceptions raise.
                fut.result()
                return
            elif fut is data_task:
                yield fut.result()
                data_task = asyncio.create_task(body.get())
                pending.add(data_task)

Anyway, this probably needs to be tuned, (edit: I tailored this example to work a bit better) but is fairly simple enough to give a similar interface and not add another dependency on httpx.

But I think that's going too far away from the way things work today. An intermediate step might be to populate a Queue which the caller could read from. But that still feels kind of clunky to use and I'm not sure it's any better than what you have here.

I think there is still value in this; you have already proposed the use of a Queue here (#3351 (comment)) and even so, this still doesn't always work because it does not have throttling with asymmetric network speeds; unless you can await when adding/putting to the queue, the queue can grow unbounded if the producer for the queue (here the AsyncHTTPClient) adds much faster than the consumer can serve (here a downstream client). Part of my interest in this PR was where I had the main tornado server proxying content from a request to an internal service that was inside the local network. The client request was usually much slower, but when the file size being transferred was large (~1GB), the route would regularly cause python's memory usage to grow wayyyy higher. Add more than one of these requests running concurrently and you have high memory problems, which we observed on production servers. Adding the ability to simply await when adding to the queue at least solves the memory issues.

The test isn't telling us much because there's no way to tell whether or when the callback resumes after waiting for gen.moment.

Partially true. The test verifies that either a standard callback, or a coroutine can successfully be passed to the client without issue. I agree that it doesn't deal with the finer points of how the callback is expected to behave. If you want me to add more tests, I can when I have more time.

@bdarnell
Copy link
Member

It is possible but more difficult to support streaming callbacks on curl_httpclient. I'd be OK with just returning an error if streaming_callback is awaitable in that case.
I can look into this or otherwise raise an error. I just did not want to add complexity where it wasn't needed.

Raising an error is fine.

I think there is still value in this; you have already proposed the use of a Queue here (#3351 (comment)) and even so, this still doesn't always work because it does not have throttling with asymmetric network speeds; unless you can await when adding/putting to the queue, the queue can grow unbounded if the producer for the queue (here the AsyncHTTPClient) adds much faster than the consumer can serve (here a downstream client).

Right. What I suggested in the linked comment, populating a queue from streaming_callback, wouldn't work for this reason; you'd need some way to push back on the HTTP client. What I was thinking of here was something more integrated: when you fetch in "queue mode", you don't pass a streaming_callback, but instead a Queue is provided for you to read from. The HTTP client would internally handle awaiting as necessary without changing the public streaming_callback interface. But as I write this up it doesn't seem very appealing in comparison to the more straightforward change to allow asynchronous streaming_callback.

Partially true. The test verifies that either a standard callback, or a coroutine can successfully be passed to the client without issue. I agree that it doesn't deal with the finer points of how the callback is expected to behave. If you want me to add more tests, I can when I have more time.

Let's do three small changes to the test:

  1. Use async def instead of @gen.coroutine. Aside from being the more modern approach, native coroutines are started lazily and therefore expose a few more edge cases.
  2. Append to chunk_bytes after yield gen.moment (or await None) instead of before. Again this ensures that the coroutine is actually run across multiple await points.
  3. Fetch /chunk instead of /hello (I think that's wired up in this test already). This will give you two entries in chunk_bytes instead of one.

@eulersIDcrisis
Copy link
Contributor Author

eulersIDcrisis commented Sep 17, 2025

Updated the test here: 82f9209

I also added the test to verify an exception is raised for coroutine callbacks with the curlhttpclient for now here: 4632067 and here: 728fb84

@bdarnell
Copy link
Member

Looks good! Thanks for your patience with this.

@bdarnell bdarnell merged commit d30ef74 into tornadoweb:master Sep 17, 2025
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants