Skip to content

Commit 79422c0

Browse files
authored
collect all clients to one single folder (#119)
1 parent b1e365b commit 79422c0

File tree

11 files changed

+152
-144
lines changed

11 files changed

+152
-144
lines changed

examples/hello_world.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
1414
#![warn(rust_2018_idioms)]
1515

16-
use mini_redis::{client, Result};
16+
use mini_redis::{clients::Client, Result};
1717

1818
#[tokio::main]
1919
pub async fn main() -> Result<()> {
2020
// Open a connection to the mini-redis address.
21-
let mut client = client::connect("127.0.0.1:6379").await?;
21+
let mut client = Client::connect("127.0.0.1:6379").await?;
2222

2323
// Set the key "hello" with value "world"
2424
client.set("hello", "world".into()).await?;

examples/pub.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
1818
#![warn(rust_2018_idioms)]
1919

20-
use mini_redis::{client, Result};
20+
use mini_redis::{clients::Client, Result};
2121

2222
#[tokio::main]
2323
async fn main() -> Result<()> {
2424
// Open a connection to the mini-redis address.
25-
let mut client = client::connect("127.0.0.1:6379").await?;
25+
let mut client = Client::connect("127.0.0.1:6379").await?;
2626

2727
// publish message `bar` on channel foo
2828
client.publish("foo", "bar".into()).await?;

examples/sub.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
1818
#![warn(rust_2018_idioms)]
1919

20-
use mini_redis::{client, Result};
20+
use mini_redis::{clients::Client, Result};
2121

2222
#[tokio::main]
2323
pub async fn main() -> Result<()> {
2424
// Open a connection to the mini-redis address.
25-
let client = client::connect("127.0.0.1:6379").await?;
25+
let client = Client::connect("127.0.0.1:6379").await?;
2626

2727
// subscribe to channel foo
2828
let mut subscriber = client.subscribe(vec!["foo".into()]).await?;

src/bin/cli.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use mini_redis::{client, DEFAULT_PORT};
1+
use mini_redis::{clients::Client, DEFAULT_PORT};
22

33
use bytes::Bytes;
44
use clap::{Parser, Subcommand};
@@ -86,7 +86,7 @@ async fn main() -> mini_redis::Result<()> {
8686
let addr = format!("{}:{}", cli.host, cli.port);
8787

8888
// Establish a connection
89-
let mut client = client::connect(&addr).await?;
89+
let mut client = Client::connect(&addr).await?;
9090

9191
// Process the requested command
9292
match cli.command {

src/blocking_client.rs renamed to src/clients/blocking_client.rs

+39-39
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::time::Duration;
77
use tokio::net::ToSocketAddrs;
88
use tokio::runtime::Runtime;
99

10-
pub use crate::client::Message;
10+
pub use crate::clients::Message;
1111

1212
/// Established connection with a Redis server.
1313
///
@@ -18,7 +18,7 @@ pub use crate::client::Message;
1818
/// Requests are issued using the various methods of `Client`.
1919
pub struct BlockingClient {
2020
/// The asynchronous `Client`.
21-
inner: crate::client::Client,
21+
inner: crate::clients::Client,
2222

2323
/// A `current_thread` runtime for executing operations on the asynchronous
2424
/// client in a blocking manner.
@@ -33,7 +33,7 @@ pub struct BlockingClient {
3333
/// called.
3434
pub struct BlockingSubscriber {
3535
/// The asynchronous `Subscriber`.
36-
inner: crate::client::Subscriber,
36+
inner: crate::clients::Subscriber,
3737

3838
/// A `current_thread` runtime for executing operations on the asynchronous
3939
/// `Subscriber` in a blocking manner.
@@ -43,43 +43,43 @@ pub struct BlockingSubscriber {
4343
/// The iterator returned by `Subscriber::into_iter`.
4444
struct SubscriberIterator {
4545
/// The asynchronous `Subscriber`.
46-
inner: crate::client::Subscriber,
46+
inner: crate::clients::Subscriber,
4747

4848
/// A `current_thread` runtime for executing operations on the asynchronous
4949
/// `Subscriber` in a blocking manner.
5050
rt: Runtime,
5151
}
5252

53-
/// Establish a connection with the Redis server located at `addr`.
54-
///
55-
/// `addr` may be any type that can be asynchronously converted to a
56-
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
57-
/// trait is the Tokio version and not the `std` version.
58-
///
59-
/// # Examples
60-
///
61-
/// ```no_run
62-
/// use mini_redis::blocking_client;
63-
///
64-
/// fn main() {
65-
/// let client = match blocking_client::connect("localhost:6379") {
66-
/// Ok(client) => client,
67-
/// Err(_) => panic!("failed to establish connection"),
68-
/// };
69-
/// # drop(client);
70-
/// }
71-
/// ```
72-
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
73-
let rt = tokio::runtime::Builder::new_current_thread()
74-
.enable_all()
75-
.build()?;
53+
impl BlockingClient {
54+
/// Establish a connection with the Redis server located at `addr`.
55+
///
56+
/// `addr` may be any type that can be asynchronously converted to a
57+
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
58+
/// trait is the Tokio version and not the `std` version.
59+
///
60+
/// # Examples
61+
///
62+
/// ```no_run
63+
/// use mini_redis::clients::BlockingClient;
64+
///
65+
/// fn main() {
66+
/// let client = match BlockingClient::connect("localhost:6379") {
67+
/// Ok(client) => client,
68+
/// Err(_) => panic!("failed to establish connection"),
69+
/// };
70+
/// # drop(client);
71+
/// }
72+
/// ```
73+
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
74+
let rt = tokio::runtime::Builder::new_current_thread()
75+
.enable_all()
76+
.build()?;
7677

77-
let inner = rt.block_on(crate::client::connect(addr))?;
78+
let inner = rt.block_on(crate::clients::Client::connect(addr))?;
7879

79-
Ok(BlockingClient { inner, rt })
80-
}
80+
Ok(BlockingClient { inner, rt })
81+
}
8182

82-
impl BlockingClient {
8383
/// Get the value of key.
8484
///
8585
/// If the key does not exist the special value `None` is returned.
@@ -89,10 +89,10 @@ impl BlockingClient {
8989
/// Demonstrates basic usage.
9090
///
9191
/// ```no_run
92-
/// use mini_redis::blocking_client;
92+
/// use mini_redis::clients::BlockingClient;
9393
///
9494
/// fn main() {
95-
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
95+
/// let mut client = BlockingClient::connect("localhost:6379").unwrap();
9696
///
9797
/// let val = client.get("foo").unwrap();
9898
/// println!("Got = {:?}", val);
@@ -115,10 +115,10 @@ impl BlockingClient {
115115
/// Demonstrates basic usage.
116116
///
117117
/// ```no_run
118-
/// use mini_redis::blocking_client;
118+
/// use mini_redis::clients::BlockingClient;
119119
///
120120
/// fn main() {
121-
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
121+
/// let mut client = BlockingClient::connect("localhost:6379").unwrap();
122122
///
123123
/// client.set("foo", "bar".into()).unwrap();
124124
///
@@ -149,13 +149,13 @@ impl BlockingClient {
149149
/// favorable.
150150
///
151151
/// ```no_run
152-
/// use mini_redis::blocking_client;
152+
/// use mini_redis::clients::BlockingClient;
153153
/// use std::thread;
154154
/// use std::time::Duration;
155155
///
156156
/// fn main() {
157157
/// let ttl = Duration::from_millis(500);
158-
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
158+
/// let mut client = BlockingClient::connect("localhost:6379").unwrap();
159159
///
160160
/// client.set_expires("foo", "bar".into(), ttl).unwrap();
161161
///
@@ -191,10 +191,10 @@ impl BlockingClient {
191191
/// Demonstrates basic usage.
192192
///
193193
/// ```no_run
194-
/// use mini_redis::blocking_client;
194+
/// use mini_redis::clients::BlockingClient;
195195
///
196196
/// fn main() {
197-
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
197+
/// let mut client = BlockingClient::connect("localhost:6379").unwrap();
198198
///
199199
/// let val = client.publish("foo", "bar".into()).unwrap();
200200
/// println!("Got = {:?}", val);

src/buffer.rs renamed to src/clients/buffered_client.rs

+33-33
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,11 @@
1-
use crate::client::Client;
1+
use crate::clients::Client;
22
use crate::Result;
33

44
use bytes::Bytes;
55
use tokio::sync::mpsc::{channel, Receiver, Sender};
66
use tokio::sync::oneshot;
77

8-
/// Create a new client request buffer
9-
///
10-
/// The `Client` performs Redis commands directly on the TCP connection. Only a
11-
/// single request may be in-flight at a given time and operations require
12-
/// mutable access to the `Client` handle. This prevents using a single Redis
13-
/// connection from multiple Tokio tasks.
14-
///
15-
/// The strategy for dealing with this class of problem is to spawn a dedicated
16-
/// Tokio task to manage the Redis connection and using "message passing" to
17-
/// operate on the connection. Commands are pushed into a channel. The
18-
/// connection task pops commands off of the channel and applies them to the
19-
/// Redis connection. When the response is received, it is forwarded to the
20-
/// original requester.
21-
///
22-
/// The returned `Buffer` handle may be cloned before passing the new handle to
23-
/// separate tasks.
24-
pub fn buffer(client: Client) -> Buffer {
25-
// Setting the message limit to a hard coded value of 32. in a real-app, the
26-
// buffer size should be configurable, but we don't need to do that here.
27-
let (tx, rx) = channel(32);
28-
29-
// Spawn a task to process requests for the connection.
30-
tokio::spawn(async move { run(client, rx).await });
31-
32-
// Return the `Buffer` handle.
33-
Buffer { tx }
34-
}
35-
36-
// Enum used to message pass the requested command from the `Buffer` handle
8+
// Enum used to message pass the requested command from the `BufferedClient` handle
379
#[derive(Debug)]
3810
enum Command {
3911
Get(String),
@@ -53,7 +25,7 @@ type Message = (Command, oneshot::Sender<Result<Option<Bytes>>>);
5325
/// response is returned back to the caller via a `oneshot`.
5426
async fn run(mut client: Client, mut rx: Receiver<Message>) {
5527
// Repeatedly pop messages from the channel. A return value of `None`
56-
// indicates that all `Buffer` handles have dropped and there will never be
28+
// indicates that all `BufferedClient` handles have dropped and there will never be
5729
// another message sent on the channel.
5830
while let Some((cmd, tx)) = rx.recv().await {
5931
// The command is forwarded to the connection
@@ -71,11 +43,39 @@ async fn run(mut client: Client, mut rx: Receiver<Message>) {
7143
}
7244

7345
#[derive(Clone)]
74-
pub struct Buffer {
46+
pub struct BufferedClient {
7547
tx: Sender<Message>,
7648
}
7749

78-
impl Buffer {
50+
impl BufferedClient {
51+
/// Create a new client request buffer
52+
///
53+
/// The `Client` performs Redis commands directly on the TCP connection. Only a
54+
/// single request may be in-flight at a given time and operations require
55+
/// mutable access to the `Client` handle. This prevents using a single Redis
56+
/// connection from multiple Tokio tasks.
57+
///
58+
/// The strategy for dealing with this class of problem is to spawn a dedicated
59+
/// Tokio task to manage the Redis connection and using "message passing" to
60+
/// operate on the connection. Commands are pushed into a channel. The
61+
/// connection task pops commands off of the channel and applies them to the
62+
/// Redis connection. When the response is received, it is forwarded to the
63+
/// original requester.
64+
///
65+
/// The returned `BufferedClient` handle may be cloned before passing the new handle to
66+
/// separate tasks.
67+
pub fn buffer(client: Client) -> BufferedClient {
68+
// Setting the message limit to a hard coded value of 32. in a real-app, the
69+
// buffer size should be configurable, but we don't need to do that here.
70+
let (tx, rx) = channel(32);
71+
72+
// Spawn a task to process requests for the connection.
73+
tokio::spawn(async move { run(client, rx).await });
74+
75+
// Return the `BufferedClient` handle.
76+
BufferedClient { tx }
77+
}
78+
7979
/// Get the value of a key.
8080
///
8181
/// Same as `Client::get` but requests are **buffered** until the associated

0 commit comments

Comments
 (0)