-
Notifications
You must be signed in to change notification settings - Fork 45
Add aio/aiohttp.py module #658
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
awscrt/http_asyncio.py
Outdated
async def write_data_async(self, | ||
data_stream: Union[InputStream, Any], | ||
end_stream: bool = False) -> None: | ||
"""Write data to the stream asynchronously. | ||
|
||
Args: | ||
data_stream (Union[InputStream, Any]): Data to write. | ||
end_stream (bool): Whether this is the last data to write. | ||
|
||
Returns: | ||
None: When the write completes. | ||
""" | ||
future = self.write_data(data_stream, end_stream) | ||
await asyncio.wrap_future(future) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an async method, but it doesn't look like I can actually pass an async data stream in. I would expect this to be able to take an AsyncIterator[bytes]
as a data stream. I would also expect the http/1.1 stream to have this method.
Currently a major problem with the using bidirectional streaming in async is that we have to wrap async iterators and treat them like a sync data stream. We can make that work, but it works poorly because the only way to get the CRT to call read
later is to throw a particular IO error that gets retried. That retry is immediate though, so you can end up in a situation where the CRT is running a hot loop on the read
method as it waits for data to become available in the background.
To solve this, we really need the CRT to read async. Ideally by taking an AsyncIterator
, but there might be other ways with Futures to make this an easier transition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
Currently a major problem with the using bidirectional streaming in async is that we have to wrap async iterators and treat them like a sync data stream. We can make that work, but it works poorly because the only way to get the CRT to call read later is to throw a particular IO error that gets retried. That retry is immediate though, so you can end up in a situation where the CRT is running a hot loop on the read method as it waits for data to become available in the background.
I'll look into taking AsyncIterator[bytes]
.
But I think the major problem here has been resolved by this interface already. I updated the smithy-pyhton here to use this new interface.
Instead of expecting the InputStream
to throw the particular IO error, just keep providing the data when it's available. And I tested with the sample, the CPU usage has been dropped from 30% to lower than 10%. Since this API will not just loop to read from the input stream, it wait until more input stream to be provided.
…version Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
awscrt/aio/aiohttp.py
Outdated
async def _set_request_body_generator(self, body_iterator: AsyncIterator[bytes]): | ||
try: | ||
async for chunk in body_iterator: | ||
await self._write_data(io.BytesIO(chunk), False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to do this without wrapping this in a bytesio? I'm concerned about potential extra copies. It shouldn't happen so long as you do no mutations since it starts by just holding a reference. But I'm not sure what apis are being called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should already avoid the extra copy, it will be wrapped by our input stream interface use readinto
.
https://github.com/awslabs/aws-crt-python/blob/main/awscrt/io.py#L689-L727
awscrt/aio/aiohttp.py
Outdated
async def _set_request_body_generator(self, body_iterator: AsyncIterator[bytes]): | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is HTTP1.1 not able to share the implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HTTP/1.1 transfer encoding may need to add trailer headers and chunk extensions. So the API may need to be a bit different. Maybe let the iterator to yield a chunkinfo struct that can pass extra info like trailer instead of simple bytes.
Or we can leave trailer header as unsupported for now.
Anyhow I'd want to leave the http/1.1 implementation for a quick follow up PR if we want it.
awscrt/aio/aiohttp.py
Outdated
bootstrap: Optional[ClientBootstrap] = None, | ||
socket_options: Optional[SocketOptions] = None, | ||
tls_connection_options: Optional[TlsConnectionOptions] = None, | ||
proxy_options: Optional['HttpProxyOptions'] = None) -> "AIOHttpClientConnectionUnified": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does HttpProxyOptions need to be forward declared?
awscrt/aio/aiohttp.py
Outdated
future.set_result(None) | ||
|
||
_awscrt.http2_client_stream_write_data(self, body_stream, end_stream, on_write_complete) | ||
await asyncio.wrap_future(future) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we want to use concurrent Future here and then wrap it into asyncio one? why not use async io one from start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the concurrent future is thread safe, but the asyncio one is not.
If we just get asyncio future, we will need to invoke self._loop.call_soon_threadsafe()
to set the future. Either way, it's a bit awkward.
But, yeah, I did use the asyncio future directly for some other places, I guess it would be nice to keep it consistent.
This comment was marked as off-topic.
This comment was marked as off-topic.
I don't get what is the question here, I assume you confused this with the aiohttp project. |
@@ -7,6 +7,7 @@ | |||
'auth', | |||
'crypto', | |||
'http', | |||
'aio.aiohttp', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; aiohttp is an existing library that's already widely used. Would it make more sense for this to just be aio.http
since we already have that precedent for the existing http
module?
class AIOHttpClientConnection(AIOHttpClientConnectionUnified): | ||
""" | ||
An async HTTP/1.1 only client connection. | ||
|
||
Use `AIOHttpClientConnection.new()` to establish a new connection. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the intended vision for this connection type? http/1.1 is inherently stateful and we can't multiplex that same way that h2 does. The request()
method is also not async, so I'm curious what value this provides in its current state.
Is the idea that you take the stream and then write to it periodically until we get a signal the input is exhausted, then read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AIOHttpClientConnection
is designed to be HTTP/1.1 only. the protocol doesn't support multiplex, but we do some sort support it. The next request/response will just be waiting in the list to be sent after the previous request/response finishes.
The request() method is also not async
There is no async process in the request()
method, it will start the request under the hood, and the stream interface will be handle the response async.
Issue #, if available:
async
interface for HTTP functions.Description of changes:
Connection Classes
AIOHttpClientConnectionUnified
AIOHttpClientConnection
AIOHttpClientConnection.new()
async class methodAIOHttp2ClientConnection
AIOHttp2ClientConnection.new()
async class methodStream Classes
AIOHttpClientStreamUnified
AIOHttpClientStream
AIOHttp2ClientStream
TODO:
Support incremental body sending via async generators for HTTP/1.1, which will be transfer encoding binding.
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.