Skip to content

Commit 7687ecf

Browse files
committed
Erase SmithyConnector
This lets us move the PooledConnection into TransactionAttempt. Having that type have a lifetime is really painful. At this point, the doctest passes, but it's ugly!
1 parent 0b0f09e commit 7687ecf

File tree

5 files changed

+37
-56
lines changed

5 files changed

+37
-56
lines changed

amazon-qldb-driver-core/src/driver.rs

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use aws_hyper::{DynConnector, SmithyConnector};
21
use aws_sdk_qldbsessionv2::{Client, Config};
32
use bb8::Pool;
43
use std::sync::Arc;
@@ -64,7 +63,7 @@ impl QldbDriverBuilder {
6463
pub async fn sdk_config(
6564
self,
6665
sdk_config: &aws_types::config::Config,
67-
) -> Result<QldbDriver<DynConnector>, BuilderError> {
66+
) -> Result<QldbDriver, BuilderError> {
6867
let client = Client::new(sdk_config);
6968
Ok(self.build_with_client(client).await?)
7069
}
@@ -73,18 +72,12 @@ impl QldbDriverBuilder {
7372
///
7473
/// Note that `config` is the service-specific (QldbSession) config. For
7574
/// shared config, see [`sdk_config`].
76-
pub async fn config(self, config: Config) -> Result<QldbDriver<DynConnector>, BuilderError> {
75+
pub async fn config(self, config: Config) -> Result<QldbDriver, BuilderError> {
7776
let client = Client::from_conf(config);
7877
Ok(self.build_with_client(client).await?)
7978
}
8079

81-
pub async fn build_with_client<C>(
82-
self,
83-
client: Client<C>,
84-
) -> Result<QldbDriver<C>, BuilderError>
85-
where
86-
C: SmithyConnector,
87-
{
80+
pub async fn build_with_client(self, client: Client) -> Result<QldbDriver, BuilderError> {
8881
let ledger_name = self.ledger_name.ok_or(BuilderError::UsageError(format!(
8982
"ledger_name must be initialized"
9083
)))?;
@@ -148,19 +141,13 @@ impl QldbDriverBuilder {
148141
/// we wrap all policies in a Mutex. Performance of a Mutex is unlikely to be
149142
/// the limiting factor in a QLDB application.
150143
#[derive(Clone)]
151-
pub struct QldbDriver<C>
152-
where
153-
C: SmithyConnector,
154-
{
144+
pub struct QldbDriver {
155145
pub ledger_name: Arc<String>,
156-
session_pool: Arc<Pool<QldbSessionV2Manager<C>>>,
146+
session_pool: Arc<Pool<QldbSessionV2Manager>>,
157147
transaction_retry_policy: Arc<Mutex<Box<dyn TransactionRetryPolicy + Send + Sync>>>,
158148
}
159149

160-
impl<C> QldbDriver<C>
161-
where
162-
C: SmithyConnector,
163-
{
150+
impl QldbDriver {
164151
pub fn ledger_name(&self) -> String {
165152
(*self.ledger_name).clone()
166153
}
@@ -190,15 +177,17 @@ where
190177
loop {
191178
attempt_number += 1;
192179

193-
// Connection failures at this point have already been retried by
194-
// the pool, and so we give up at this point.
195-
let mut pooled_session =
196-
self.session_pool
197-
.get()
198-
.await
199-
.map_err(|bb8| TransactError::CommunicationError {
200-
source: Box::new(bb8),
201-
})?;
180+
// We used `get_owned` to hide the lifetime of the pool from
181+
// customers. This is important because otherwise the future
182+
// inherits this lifetime and things get messy (shows up as:
183+
// `TransactionAttempt<'pool>`).
184+
let pooled_session = self.session_pool.get_owned().await.map_err(|bb8| {
185+
// Connection failures at this point have already been retried by
186+
// the pool, and so we give up at this point.
187+
TransactError::CommunicationError {
188+
source: Box::new(bb8),
189+
}
190+
})?;
202191

203192
// Note that `PooledConnection` uses the `Drop` trait to hand the
204193
// connection back to the pool. It'd be really nice to hand the
@@ -210,7 +199,7 @@ where
210199
// Rust will guarantee we do it at some point), but putting the
211200
// connection back in the pool before sleeping will probably be
212201
// beneficial.
213-
let tx = match TransactionAttempt::start(&mut pooled_session).await {
202+
let tx = match TransactionAttempt::start(pooled_session).await {
214203
Ok(tx) => tx,
215204
Err(e) => {
216205
// FIXME: Include some sort of sleep and attempt cap.
@@ -258,10 +247,6 @@ where
258247
}
259248
};
260249

261-
// See above note. We put the connection back in the pool before
262-
// adding any delay.
263-
drop(pooled_session);
264-
265250
let retry_ins = {
266251
let policy = self.transaction_retry_policy.lock().await;
267252
policy.on_err(&comm_err, attempt_number)

amazon-qldb-driver-core/src/error.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ impl From<aws_sdk_qldbsessionv2::model::TransactionError> for GenericError {
7070
/// due to user code (e.g. deserializing a QLDB document into a local type), or
7171
/// from interacting with the QLDB service.
7272
#[derive(Debug, Error)]
73+
#[non_exhaustive]
7374
pub enum TransactError<E>
7475
where
7576
E: std::error::Error + 'static,
@@ -94,7 +95,7 @@ where
9495
source: BoxError,
9596
},
9697

97-
#[error("transaction was aborted")]
98+
#[error("transaction was aborted by user code")]
9899
Aborted,
99100

100101
#[error("transaction failed after {attempts} attempts, last error: {last_err}")]

amazon-qldb-driver-core/src/pool.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use async_trait::async_trait;
2-
use aws_hyper::{SdkError, SmithyConnector};
2+
use aws_hyper::{DynConnector, SdkError, SmithyConnector};
33
use aws_sdk_qldbsessionv2::{
44
error::SendCommandError,
55
model::{CommandStream, ResultStream},
@@ -39,10 +39,7 @@ impl ErrorSink<ConnectionError> for QldbErrorLoggingErrorSink {
3939

4040
// FIXME: This is not generic over the client (it uses the rust awssdk
4141
// directly), which means we cannot change the sdk (e.g. to support javascript).
42-
pub struct QldbSessionV2Manager<C>
43-
where
44-
C: SmithyConnector,
45-
{
42+
pub struct QldbSessionV2Manager<C = DynConnector> {
4643
client: Client<C>,
4744
pub ledger_name: String,
4845
}

amazon-qldb-driver-core/src/transaction.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use aws_sdk_qldbsessionv2::model::{
22
AbortTransactionRequest, CommandStream, CommitTransactionRequest, ExecuteStatementRequest,
33
FetchPageRequest, ResultStream, StartTransactionRequest,
44
};
5+
use bb8::PooledConnection;
56
use bytes::Bytes;
67
use ion_c_sys::reader::IonCReaderHandle;
78
use ion_c_sys::result::IonCError;
@@ -10,7 +11,7 @@ use std::marker::PhantomData;
1011
use tracing::debug;
1112

1213
use crate::error;
13-
use crate::pool::QldbHttp2Connection;
14+
use crate::pool::QldbSessionV2Manager;
1415
use crate::{error::TransactError, execution_stats::ExecutionStats};
1516

1617
/// The results of executing a statement.
@@ -68,9 +69,9 @@ pub enum TransactionDisposition<R> {
6869
/// succeeds!
6970
///
7071
/// `E` represents any custom error variant the user may throw.
71-
pub struct TransactionAttempt<'pool, E> {
72-
/// A pool connection that we'll send our commands down.
73-
connection: &'pool mut QldbHttp2Connection,
72+
pub struct TransactionAttempt<E> {
73+
/// A pooled connection that we'll send our commands down.
74+
connection: PooledConnection<'static, QldbSessionV2Manager>,
7475

7576
/// The id of this transaction attempt. This is a speculative transaction
7677
/// id. That is, if the transaction commits, then this id is the id of the
@@ -93,13 +94,13 @@ pub struct TransactionAttempt<'pool, E> {
9394
err: PhantomData<E>,
9495
}
9596

96-
impl<'pool, E> TransactionAttempt<'pool, E>
97+
impl<E> TransactionAttempt<E>
9798
where
9899
E: std::error::Error + 'static,
99100
{
100101
pub(crate) async fn start(
101-
connection: &'pool mut QldbHttp2Connection,
102-
) -> Result<TransactionAttempt<'pool, E>, TransactError<E>> {
102+
mut connection: PooledConnection<'static, QldbSessionV2Manager>,
103+
) -> Result<TransactionAttempt<E>, TransactError<E>> {
103104
let mut accumulated_execution_stats = ExecutionStats::default();
104105
let resp = connection
105106
.send_streaming_command(CommandStream::StartTransaction(
@@ -127,7 +128,7 @@ where
127128
})
128129
}
129130

130-
pub fn statement<S>(&mut self, statement: S) -> StatementBuilder<'pool, '_, E>
131+
pub fn statement<S>(&mut self, statement: S) -> StatementBuilder<'_, E>
131132
where
132133
S: Into<String>,
133134
{
@@ -287,19 +288,16 @@ where
287288
}
288289
}
289290

290-
pub struct StatementBuilder<'pool, 'tx, E> {
291-
attempt: &'tx mut TransactionAttempt<'pool, E>,
291+
pub struct StatementBuilder<'tx, E> {
292+
attempt: &'tx mut TransactionAttempt<E>,
292293
statement: Statement,
293294
}
294295

295-
impl<'pool, 'tx, E> StatementBuilder<'pool, 'tx, E>
296+
impl<'tx, E> StatementBuilder<'tx, E>
296297
where
297298
E: std::error::Error + 'static,
298299
{
299-
fn new(
300-
attempt: &'tx mut TransactionAttempt<'pool, E>,
301-
partiql: String,
302-
) -> StatementBuilder<'pool, 'tx, E> {
300+
fn new(attempt: &'tx mut TransactionAttempt<E>, partiql: String) -> StatementBuilder<'tx, E> {
303301
StatementBuilder {
304302
attempt,
305303
statement: Statement {
@@ -313,7 +311,7 @@ where
313311
// 1. need an IonElement so we can hash it. in the future, we hope to remove this as a requirement
314312
// 2. perhaps we want an in-crate trait for coherency reasons
315313
// TODO: make public when ready
316-
pub fn param<B>(mut self, param: B) -> StatementBuilder<'pool, 'tx, E>
314+
pub fn param<B>(mut self, param: B) -> StatementBuilder<'tx, E>
317315
where
318316
B: Into<Bytes>,
319317
{

amazon-qldb-driver/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! Usage example:
44
//!
5-
//! ```
5+
//! ```no_run
66
//! use amazon_qldb_driver::awssdk::Config;
77
//! use amazon_qldb_driver::{QldbDriverBuilder, TransactError};
88
//! use tokio;

0 commit comments

Comments
 (0)