Skip to content

Commit 7ee95f4

Browse files
Refactor client (#73)
* client refactor fix remove clone clean up fix * wrap parameters in Arc * add client bench
1 parent 5d6241c commit 7ee95f4

File tree

6 files changed

+192
-155
lines changed

6 files changed

+192
-155
lines changed

Diff for: Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- eval: (cargo-minor-mode 1) -*-
22

33
[workspace]
4-
members = ["influxdb", "influxdb_derive"]
4+
members = ["influxdb", "influxdb_derive", "benches"]
55

66
[patch.crates-io]
77
influxdb = { path = "./influxdb" }

Diff for: benches/Cargo.toml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# -*- eval: (cargo-minor-mode 1) -*-
2+
3+
[package]
4+
name = "benches"
5+
version = "0.0.0"
6+
publish = false
7+
edition = "2018"
8+
9+
[dev-dependencies]
10+
chrono = { version = "0.4.11", features = ["serde"] }
11+
futures = "0.3.4"
12+
influxdb = { path = "../influxdb", features = ["derive"] }
13+
tokio = { version = "0.2.22", features = ["macros", "rt-threaded", "sync"] }
14+
15+
[[bench]]
16+
name = "client"
17+
path = "client.rs"
18+
harness = false

Diff for: benches/client.rs

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use chrono::{DateTime, Utc};
2+
use futures::stream::StreamExt;
3+
use influxdb::Error;
4+
use influxdb::InfluxDbWriteable;
5+
use influxdb::{Client, Query};
6+
use std::sync::Arc;
7+
use std::time::Instant;
8+
use tokio::sync::mpsc::unbounded_channel;
9+
use tokio::sync::Semaphore;
10+
11+
#[derive(InfluxDbWriteable, Clone)]
12+
struct WeatherReading {
13+
time: DateTime<Utc>,
14+
humidity: i32,
15+
#[tag]
16+
wind_direction: String,
17+
}
18+
19+
#[tokio::main]
20+
async fn main() {
21+
let db_name = "bench";
22+
let url = "http://localhost:8086";
23+
let number_of_total_requests = 20000;
24+
let concurrent_requests = 1000;
25+
26+
let client = Client::new(url, db_name);
27+
let concurrency_limit = Arc::new(Semaphore::new(concurrent_requests));
28+
29+
prepare_influxdb(&client, db_name).await;
30+
let measurements = generate_measurements(number_of_total_requests);
31+
let (tx, mut rx) = unbounded_channel::<Result<String, Error>>();
32+
33+
let start = Instant::now();
34+
for m in measurements {
35+
let permit = concurrency_limit.clone().acquire_owned().await;
36+
let client_task = client.clone();
37+
let tx_task = tx.clone();
38+
tokio::spawn(async move {
39+
let res = client_task.query(&m.into_query("weather")).await;
40+
let _ = tx_task.send(res);
41+
drop(permit);
42+
});
43+
}
44+
drop(tx);
45+
46+
let mut successful_count = 0;
47+
let mut error_count = 0;
48+
while let Some(res) = rx.next().await {
49+
if res.is_err() {
50+
error_count += 1;
51+
} else {
52+
successful_count += 1;
53+
}
54+
}
55+
56+
let end = Instant::now();
57+
58+
println!(
59+
"Throughput: {:.1} request/s",
60+
1000000.0 * successful_count as f64 / (end - start).as_micros() as f64
61+
);
62+
println!(
63+
"{} successful requests, {} errors",
64+
successful_count, error_count
65+
);
66+
}
67+
68+
async fn prepare_influxdb(client: &Client, db_name: &str) {
69+
let create_db_stmt = format!("CREATE DATABASE {}", db_name);
70+
client
71+
.query(&Query::raw_read_query(create_db_stmt))
72+
.await
73+
.expect("failed to create database");
74+
}
75+
76+
fn generate_measurements(n: u64) -> Vec<WeatherReading> {
77+
(0..n)
78+
.collect::<Vec<u64>>()
79+
.iter_mut()
80+
.map(|_| WeatherReading {
81+
time: Utc::now(),
82+
humidity: 30,
83+
wind_direction: String::from("north"),
84+
})
85+
.collect()
86+
}

Diff for: influxdb/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ futures = "0.3.4"
2121
lazy_static = "1.4.0"
2222
influxdb_derive = { version = "0.2.0", optional = true }
2323
regex = "1.3.5"
24-
reqwest = { version = "0.10.4", features = ["json"] }
24+
reqwest = { version = "0.10.8", features = ["json"] }
2525
serde = { version = "1.0.104", features = ["derive"], optional = true }
2626
serde_json = { version = "1.0.48", optional = true }
2727
thiserror = "1.0"

Diff for: influxdb/src/client/mod.rs

+57-121
Original file line numberDiff line numberDiff line change
@@ -16,49 +16,19 @@
1616
//! ```
1717
1818
use futures::prelude::*;
19-
use reqwest::{self, Client as ReqwestClient, StatusCode, Url};
19+
use reqwest::{self, Client as ReqwestClient, StatusCode};
2020

2121
use crate::query::QueryTypes;
2222
use crate::Error;
2323
use crate::Query;
24-
25-
#[derive(Clone, Debug)]
26-
/// Internal Authentication representation
27-
pub(crate) struct Authentication {
28-
pub username: String,
29-
pub password: String,
30-
}
24+
use std::sync::Arc;
3125

3226
#[derive(Clone, Debug)]
3327
/// Internal Representation of a Client
3428
pub struct Client {
35-
url: String,
36-
database: String,
37-
auth: Option<Authentication>,
38-
}
39-
40-
impl Into<Vec<(String, String)>> for Client {
41-
fn into(self) -> Vec<(String, String)> {
42-
let mut vec: Vec<(String, String)> = Vec::new();
43-
vec.push(("db".to_string(), self.database));
44-
if let Some(auth) = self.auth {
45-
vec.push(("u".to_string(), auth.username));
46-
vec.push(("p".to_string(), auth.password));
47-
}
48-
vec
49-
}
50-
}
51-
52-
impl<'a> Into<Vec<(String, String)>> for &'a Client {
53-
fn into(self) -> Vec<(String, String)> {
54-
let mut vec: Vec<(String, String)> = Vec::new();
55-
vec.push(("db".to_string(), self.database.to_owned()));
56-
if let Some(auth) = &self.auth {
57-
vec.push(("u".to_string(), auth.username.to_owned()));
58-
vec.push(("p".to_string(), auth.password.to_owned()));
59-
}
60-
vec
61-
}
29+
pub(crate) url: Arc<String>,
30+
pub(crate) parameters: Arc<Vec<(&'static str, String)>>,
31+
pub(crate) client: ReqwestClient,
6232
}
6333

6434
impl Client {
@@ -82,9 +52,9 @@ impl Client {
8252
S2: Into<String>,
8353
{
8454
Client {
85-
url: url.into(),
86-
database: database.into(),
87-
auth: None,
55+
url: Arc::new(url.into()),
56+
parameters: Arc::new(vec![("db", database.into())]),
57+
client: ReqwestClient::new(),
8858
}
8959
}
9060

@@ -93,7 +63,7 @@ impl Client {
9363
/// # Arguments
9464
///
9565
/// * username: The Username for InfluxDB.
96-
/// * password: THe Password for the user.
66+
/// * password: The Password for the user.
9767
///
9868
/// # Examples
9969
///
@@ -107,16 +77,17 @@ impl Client {
10777
S1: Into<String>,
10878
S2: Into<String>,
10979
{
110-
self.auth = Some(Authentication {
111-
username: username.into(),
112-
password: password.into(),
113-
});
80+
let mut with_auth = self.parameters.as_ref().clone();
81+
with_auth.push(("u", username.into()));
82+
with_auth.push(("p", password.into()));
83+
self.parameters = Arc::new(with_auth);
11484
self
11585
}
11686

11787
/// Returns the name of the database the client is using
11888
pub fn database_name(&self) -> &str {
119-
&self.database
89+
// safe to unwrap: we always set the database name in `Self::new`
90+
&self.parameters.first().unwrap().1
12091
}
12192

12293
/// Returns the URL of the InfluxDB installation the client is using
@@ -128,7 +99,11 @@ impl Client {
12899
///
129100
/// Returns a tuple of build type and version number
130101
pub async fn ping(&self) -> Result<(String, String), Error> {
131-
let res = reqwest::get(format!("{}/ping", self.url).as_str())
102+
let url = &format!("{}/ping", self.url);
103+
let res = self
104+
.client
105+
.get(url)
106+
.send()
132107
.await
133108
.map_err(|err| Error::ProtocolError {
134109
error: format!("{}", err),
@@ -197,45 +172,45 @@ impl Client {
197172
error: format!("{}", err),
198173
})?;
199174

200-
let basic_parameters: Vec<(String, String)> = self.into();
201-
202-
let client = match q.into() {
175+
let request_builder = match q.into() {
203176
QueryTypes::Read(_) => {
204177
let read_query = query.get();
205-
let mut url = Url::parse_with_params(
206-
format!("{url}/query", url = self.database_url()).as_str(),
207-
basic_parameters,
208-
)
209-
.map_err(|err| Error::UrlConstructionError {
210-
error: format!("{}", err),
211-
})?;
212-
213-
url.query_pairs_mut().append_pair("q", &read_query);
178+
let url = &format!("{}/query", &self.url);
179+
let query = [("q", &read_query)];
214180

215181
if read_query.contains("SELECT") || read_query.contains("SHOW") {
216-
ReqwestClient::new().get(url)
182+
self.client
183+
.get(url)
184+
.query(self.parameters.as_ref())
185+
.query(&query)
217186
} else {
218-
ReqwestClient::new().post(url)
187+
self.client
188+
.post(url)
189+
.query(self.parameters.as_ref())
190+
.query(&query)
219191
}
220192
}
221193
QueryTypes::Write(write_query) => {
222-
let mut url = Url::parse_with_params(
223-
format!("{url}/write", url = self.database_url()).as_str(),
224-
basic_parameters,
225-
)
226-
.map_err(|err| Error::InvalidQueryError {
227-
error: format!("{}", err),
228-
})?;
229-
230-
url.query_pairs_mut()
231-
.append_pair("precision", &write_query.get_precision());
232-
233-
ReqwestClient::new().post(url).body(query.get())
194+
let url = &format!("{}/write", &self.url);
195+
let precision = [("precision", write_query.get_precision())];
196+
197+
self.client
198+
.post(url)
199+
.query(self.parameters.as_ref())
200+
.query(&precision)
201+
.body(query.get())
234202
}
235203
};
236204

237-
let res = client
238-
.send()
205+
let request = request_builder
206+
.build()
207+
.map_err(|err| Error::UrlConstructionError {
208+
error: format!("{}", &err),
209+
})?;
210+
211+
let res = self
212+
.client
213+
.execute(request)
239214
.map_err(|err| Error::ConnectionError { error: err })
240215
.await?;
241216

@@ -262,67 +237,28 @@ impl Client {
262237

263238
#[cfg(test)]
264239
mod tests {
265-
use crate::Client;
240+
use super::Client;
266241

267242
#[test]
268243
fn test_fn_database() {
269244
let client = Client::new("http://localhost:8068", "database");
270-
assert_eq!("database", client.database_name());
245+
assert_eq!(client.database_name(), "database");
246+
assert_eq!(client.database_url(), "http://localhost:8068");
271247
}
272248

273249
#[test]
274250
fn test_with_auth() {
275251
let client = Client::new("http://localhost:8068", "database");
276-
assert_eq!(client.url, "http://localhost:8068");
277-
assert_eq!(client.database, "database");
278-
assert!(client.auth.is_none());
279-
let with_auth = client.with_auth("username", "password");
280-
assert!(with_auth.auth.is_some());
281-
let auth = with_auth.auth.unwrap();
282-
assert_eq!(&auth.username, "username");
283-
assert_eq!(&auth.password, "password");
284-
}
285-
286-
#[test]
287-
fn test_into_impl() {
288-
let client = Client::new("http://localhost:8068", "database");
289-
assert!(client.auth.is_none());
290-
let basic_parameters: Vec<(String, String)> = client.into();
291-
assert_eq!(
292-
vec![("db".to_string(), "database".to_string())],
293-
basic_parameters
294-
);
295-
296-
let with_auth =
297-
Client::new("http://localhost:8068", "database").with_auth("username", "password");
298-
let basic_parameters_with_auth: Vec<(String, String)> = with_auth.into();
299-
assert_eq!(
300-
vec![
301-
("db".to_string(), "database".to_string()),
302-
("u".to_string(), "username".to_string()),
303-
("p".to_string(), "password".to_string())
304-
],
305-
basic_parameters_with_auth
306-
);
307-
308-
let client = Client::new("http://localhost:8068", "database");
309-
assert!(client.auth.is_none());
310-
let basic_parameters: Vec<(String, String)> = (&client).into();
311-
assert_eq!(
312-
vec![("db".to_string(), "database".to_string())],
313-
basic_parameters
314-
);
252+
assert_eq!(vec![("db", "database".to_string())], *client.parameters);
315253

316-
let with_auth =
317-
Client::new("http://localhost:8068", "database").with_auth("username", "password");
318-
let basic_parameters_with_auth: Vec<(String, String)> = (&with_auth).into();
254+
let with_auth = client.with_auth("username", "password");
319255
assert_eq!(
320256
vec![
321-
("db".to_string(), "database".to_string()),
322-
("u".to_string(), "username".to_string()),
323-
("p".to_string(), "password".to_string())
257+
("db", "database".to_string()),
258+
("u", "username".to_string()),
259+
("p", "password".to_string())
324260
],
325-
basic_parameters_with_auth
261+
*with_auth.parameters
326262
);
327263
}
328264
}

0 commit comments

Comments
 (0)