diff --git a/core/Cargo.toml b/core/Cargo.toml index 5ba079e964f0..86ef81840572 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -182,7 +182,7 @@ services-pcloud = [] services-persy = ["dep:persy", "internal-tokio-rt"] services-postgresql = ["dep:sqlx", "sqlx?/postgres"] services-redb = ["dep:redb", "internal-tokio-rt"] -services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"] +services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp", "dep:ouroboros"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"] services-s3 = [ diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index 19afe34c7655..f5939d840b89 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -16,9 +16,13 @@ // under the License. use bb8::RunError; +use futures::Stream; +use futures::StreamExt; use http::Uri; +use ouroboros::self_referencing; use redis::cluster::ClusterClient; use redis::cluster::ClusterClientBuilder; +use redis::AsyncIter; use redis::Client; use redis::ConnectionAddr; use redis::ConnectionInfo; @@ -27,6 +31,9 @@ use redis::RedisConnectionInfo; use std::fmt::Debug; use std::fmt::Formatter; use std::path::PathBuf; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use std::time::Duration; use tokio::sync::OnceCell; @@ -291,8 +298,23 @@ impl Debug for Adapter { impl Adapter { async fn conn(&self) -> Result> { - let pool = self - .conn + let pool = self.pool().await?; + Adapter::conn_from_pool(pool).await + } + + async fn conn_from_pool( + pool: &bb8::Pool, + ) -> Result> { + pool.get().await.map_err(|err| match err { + RunError::TimedOut => { + Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary() + } + RunError::User(err) => err, + }) + } + + async fn pool(&self) -> Result<&bb8::Pool> { + self.conn .get_or_try_init(|| async { bb8::Pool::builder() .build(self.get_redis_connection_manager()) @@ -302,13 +324,7 @@ impl Adapter { .set_source(err) }) }) - .await?; - pool.get().await.map_err(|err| match err { - RunError::TimedOut => { - Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary() - } - RunError::User(err) => err, - }) + .await } fn get_redis_connection_manager(&self) -> RedisConnectionManager { @@ -326,8 +342,43 @@ impl Adapter { } } +#[self_referencing] +struct RedisAsyncConnIter<'a> { + conn: bb8::PooledConnection<'a, RedisConnectionManager>, + + #[borrows(mut conn)] + #[not_covariant] + iter: AsyncIter<'this, String>, +} + +#[self_referencing] +pub struct RedisScanner { + pool: bb8::Pool, + path: String, + + #[borrows(pool, path)] + #[not_covariant] + inner: RedisAsyncConnIter<'this>, +} + +unsafe impl Sync for RedisScanner {} + +impl Stream for RedisScanner { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.with_inner_mut(|s| s.with_iter_mut(|v| v.poll_next_unpin(cx).map(|v| v.map(Ok)))) + } +} + +impl kv::Scan for RedisScanner { + async fn next(&mut self) -> Result> { + ::next(self).await.transpose() + } +} + impl kv::Adapter for Adapter { - type Scanner = (); + type Scanner = RedisScanner; fn info(&self) -> kv::Info { kv::Info::new( @@ -336,6 +387,12 @@ impl kv::Adapter for Adapter { Capability { read: true, write: true, + // due to limitation of Redis itself, + // on cluster mode we cannot get full list of keys via SCAN, + // so here we disable it on cluster mode to avoid confusions. + // TODO: we can perform multiple SCAN on each cluster node + // and merge the result to simulate the behavior of list here. + list: self.cluster_client.is_none(), ..Default::default() }, @@ -366,4 +423,19 @@ impl kv::Adapter for Adapter { conn.append(key, value).await?; Ok(()) } + + async fn scan(&self, path: &str) -> Result { + let pool = self.pool().await?.clone(); + + RedisScanner::try_new_async_send(pool, path.to_string(), |pool, path| { + Box::pin(async { + let conn = Adapter::conn_from_pool(pool).await?; + RedisAsyncConnIter::try_new_async_send(conn, |conn| { + Box::pin(async { conn.scan(path).await }) + }) + .await + }) + }) + .await + } } diff --git a/core/src/services/redis/core.rs b/core/src/services/redis/core.rs index 041ed87169b9..6c5ca8cad2ce 100644 --- a/core/src/services/redis/core.rs +++ b/core/src/services/redis/core.rs @@ -26,6 +26,7 @@ use redis::cluster::ClusterClient; use redis::cluster_async::ClusterConnection; use redis::from_redis_value; use redis::AsyncCommands; +use redis::AsyncIter; use redis::Client; use redis::RedisError; @@ -105,6 +106,18 @@ impl RedisConnection { } Ok(()) } + + pub async fn scan(&mut self, prefix: &str) -> crate::Result> { + let pattern = format!("{}*", prefix); + Ok(match self { + RedisConnection::Normal(ref mut conn) => { + conn.scan_match(pattern).await.map_err(format_redis_error)? + } + RedisConnection::Cluster(ref mut conn) => { + conn.scan_match(pattern).await.map_err(format_redis_error)? + } + }) + } } #[derive(Clone)] diff --git a/core/src/services/redis/docs.md b/core/src/services/redis/docs.md index 8d183ea74b8b..bea0ee0a1d80 100644 --- a/core/src/services/redis/docs.md +++ b/core/src/services/redis/docs.md @@ -9,7 +9,7 @@ This service can be used to: - [x] delete - [x] copy - [x] rename -- [ ] ~~list~~ +- [x] list - [ ] ~~presign~~ - [ ] blocking diff --git a/fixtures/redis/docker-compose-kvrocks.yml b/fixtures/redis/docker-compose-kvrocks.yml index 25e55a15f52f..f4e24d76a9de 100644 --- a/fixtures/redis/docker-compose-kvrocks.yml +++ b/fixtures/redis/docker-compose-kvrocks.yml @@ -19,6 +19,6 @@ version: '3.8' services: redis: - image: apache/kvrocks:2.5.1 + image: apache/kvrocks:2.10.1 ports: - '6379:6666'