-
Notifications
You must be signed in to change notification settings - Fork 2
Work in progress - switch to eventstreaming #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some superficial comments and questions mostly on the builder. I will take a second pass at grokking the session pool management and error handling later.
pub async fn build_with_client<C>(self, client: C) -> QldbResult<QldbDriver<C>> | ||
/// Builds a `QldbDriver` using the AWS SDK for Rust. | ||
/// | ||
/// Note that `config` is the service-specific (QldbSession) config. For |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to alias the import to make that clear in the code? Or is the generic sounding Config type just something to get used to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, this is a newly introduced change in the SDK. I think there might be something clever I can do with generics to make it so there is only 1 API that can accept either.
/// | ||
/// Note that `config` is the service-specific (QldbSession) config. For | ||
/// shared config, see [`sdk_config`]. | ||
pub async fn config(self, config: Config) -> QldbResult<QldbDriver<DynConnector>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do these functions need to be async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting up the bb8 pool launches a background coroutine that manages connections. In Kotlin speak, this needs to be in the same coroutine context, and thus would have been modeled as an extension function.
One nice thing about Rust coroutines vs Kotlin coroutines is all the await points are obvious:
pub async fn config(self, config: Config) -> QldbResult<QldbDriver<DynConnector>> {
let client = Client::from_conf(config);
self.build_with_client(client).await
// ^^^^^
}
pub async fn build_with_client<C>(self, client: Client<C>) -> QldbResult<QldbDriver<C>>
where
C: SmithyConnector,
{
let ledger_name = self
.ledger_name
.ok_or(error::usage_error("ledger_name must be initialized"))?;
let transaction_retry_policy = Arc::new(Mutex::new(self.transaction_retry_policy));
let session_pool = Pool::builder()
.test_on_check_out(false)
.max_lifetime(None)
.max_size(self.max_concurrent_transactions)
.connection_timeout(Duration::from_secs(10))
.error_sink(Box::new(QldbErrorLoggingErrorSink::new()))
.build(QldbSessionV2Manager::new(client, ledger_name.clone()))
.await // <-------
// ^^^^^
.map_err(|_| error::todo_stable_error_api())?;
Ok(QldbDriver {
ledger_name: Arc::new(ledger_name.clone()),
session_pool: Arc::new(session_pool),
transaction_retry_policy,
})
}
}) | ||
.await?; | ||
.build(QldbSessionV2Manager::new(client, ledger_name.clone())) | ||
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bb8 doc says that the future completes when the pool has the requested number of connections open. Since you're not specifying min_idle, we will have a pool with zero sessions initially, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember now, but yeah I think so.
I was thinking about making APIs that mirror the bb8 pool (i.e. exposing min_idle on our builder), but I'm not sure. Got any thoughts?
.await?; | ||
.build(QldbSessionV2Manager::new(client, ledger_name.clone())) | ||
.await | ||
.map_err(|_| error::todo_stable_error_api())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mapping the error variant from bb8 Builder.build
, not an aws sdk error, right?
And map_err results in another Result
, rather than the success variant of the prior Result rather than being some macro which can return an error? I'm a bit lost about how we get from here to building the Ok result with a concrete session_pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. So..
pub async fn build(self, manager: M) -> Result<Pool<M>, M::Error> {
The error variant is the associated Error
type, i.e. this one:
#[async_trait]
impl<C> ManageConnection for QldbSessionV2Manager<C>
where
C: SmithyConnector,
{
type Connection = QldbHttp2Connection;
type Error = ConnectionError;
and ConnectionError
is defined by us to be:
pub type ConnectionError = SdkError<SendCommandError>;
So it's just the underlying SdkError that represents the issue - connection, credentials, invalid endpoint etc.
Previously, I had the carrier infrastructure (?
operator) turn that into QldbError::SdkError
. As per our discussion last week, you convinced me to abstract the SDK errors away. So I just put a todo there to remind myself.
map_err
is a function that turns Result<T, E1>
to Result<T, E2>
(as you said). So it doesn't mess with the pool at all, just the error. The T
itself is the pool, and the carrier trait essentially expands to:
let pool = match stuff() {
Ok(it) => it,
Err(err) => return Err(err).into(),
};
amazon-qldb-driver-core/src/error.rs
Outdated
use aws_smithy_http::operation::BuildError; | ||
use thiserror::Error; | ||
|
||
pub type QldbResult<T> = std::result::Result<T, QldbError>; | ||
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why include the word Box
in the type name? Isn't the type signature enough for the reader to know this is a Box, if that is important for them to know?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has become idiomatic in libraries. I'm not sure what a better name might be, so I'm copying a pattern I've seen elsewhere. One example is the aws sdk itself.
))?; | ||
let ledger_name = self | ||
.ledger_name | ||
.ok_or(error::usage_error("ledger_name must be initialized"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the None
variant is illegal, why use an Option
at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a standard idiom with builders. Otherwise you need to make loads of builder structs to model a finite state machine:
struct QldbDriverBuilder;
impl QldbDriverBuilder {
fn new() -> QldbDriverBuilder { QldbDriverBuilder {} }
fn ledger_name(ledger_name: impl Into<String>) -> QldbDriverBuilderStep1 {
QldbDriverBuilderStep1 { ledger_name: ledger_name.into() }
}
}
struct QldbDriverBuilderStep1 {
ledger_name: String
}
// ...
Because this isn't a "true runtime" error (i.e. something that randomly happens), it seems OK to allow for some misuse to escape the compiler.
There are several broad strokes made in this commit: 1. QldbSession v1 is gone entirely. I initially made an attempt to have these live side-by-side, but they're just so different. If there is a desire to allow runtime switching of underlying drivers, we can bring that back in a future commit. 2. In particular, having QldbError::SdkError was a huge mistake. Going forward, we're going to abstract the SDK errors entirely rather than couple the transport and application layers. 3. Eventstreaming is used in a 1:1 fashion only - 1 request, 1 response. In the future, we may have concurrent request-responses, but we will need correlation ids to support that. 4. I've entirely punted on the user errors and retries for now. 5. The support for dynamic SDKs has been removed. I think it's useful to get this back, but I want to try get it done using the Smithy patterns rather than the trait stuff. As a result, a bunch of indirection is gone which made this much easiser to write. Of note, I really like how the `<C>` parameter just went away - especially in the user-facing `TransactionAttempt<C>`. This is really nice. I was also pleased how the bb8 pool abstraction actually made a very natural place to stash the input/output channels. There is no testing yet.
The driver no longer needs this as eventstreaming takes care of these concerns. We may need to offer QldbHash as its own library, e.g. to offer client-side computation of the hash chain.
The doctests don't pass :(
This lets us move the PooledConnection into TransactionAttempt. Having that type have a lifetime is really painful. At this point, the doctest passes, but it's ugly!
The test fails with a timeout because the DVR isn't loaded with any precanned request-response pairs!
Breaking change - call `bufferred().await?` to explicitly load all values into memory.
This extracts the interaction logic to results.rs and makes poll_next drag in stats under the covers. Lots of commenting too!
Use buffered(), move into -core
This commit removes the distinction between the -core crate and the driver facade. Initially the idea was to have a "pure" implementation that didn't have any networking stuff so that we could compile to wasm and dependency inject different SDKs. However, we're not actually doing that yet and this indirection wasn't helping us move quickly. Furthermore, the new SDK has taken the directly where the toplevel client isn't a trait. Rather, SmithyConnector (1 layer down) allows for pluggable connectivity. This is really nice, and means that when we re-pursue the wasm directly we might not even need this two-crate approach anyway!
unique.rs showed that when using tokio::spawn, the future needs to be Send, so any suspend points (resume types) need to be Send too. This meant that statement results (which are generic over E) failed the bounds checks.
Flesh out integration test to support recording and include an example that worked for me. (The test currently fails, not sure why yet.)
I think the reason this test is failing is related to awslabs/aws-sdk-rust#296
I think it's going to be common for applications to have a single error type. The new pattern makes Infallible the default variant (read: "no custom user error"), and `.with_user_error` can be used to customize the variant.
No description provided.