Skip to content

Commit 90f5f95

Browse files
authored
Fix Encoding/Escaping according to the InfluxDb Line-Protocol (#55)
* Fix encoding/escaping according to the influxdb line protocol. Co-authored-by: Gero Gerke <[email protected]>
1 parent 789b388 commit 90f5f95

File tree

4 files changed

+139
-13
lines changed

4 files changed

+139
-13
lines changed

Diff for: Cargo.toml

+6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ futures = "0.3.4"
2020
reqwest = { version = "0.10.1", features = ["json"] }
2121
serde = { version = "1.0.104", features = ["derive"], optional = true }
2222
serde_json = { version = "1.0.46", optional = true }
23+
regex = "1.3.4"
24+
lazy_static = "1.4.0"
25+
26+
# This is a temporary work around to fix a Failure-derive compilation error
27+
# Should be removed when https://github.com/Empty2k12/influxdb-rust/issues/48 is being done
28+
quote = "=1.0.2"
2329

2430
[features]
2531
use-serde = ["serde", "serde_json"]

Diff for: src/query/line_proto_term.rs

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/// InfluxDB Line Protocol escaping helper module.
2+
/// https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
3+
use crate::Type;
4+
use lazy_static::lazy_static;
5+
use regex::Regex;
6+
7+
lazy_static! {
8+
pub static ref COMMAS_SPACES: Regex = Regex::new("[, ]").unwrap();
9+
pub static ref COMMAS_SPACES_EQUALS: Regex = Regex::new("[, =]").unwrap();
10+
pub static ref QUOTES_SLASHES: Regex = Regex::new(r#"["\\]"#).unwrap();
11+
}
12+
13+
pub enum LineProtoTerm<'a> {
14+
Measurement(&'a str), // escape commas, spaces
15+
TagKey(&'a str), // escape commas, equals, spaces
16+
TagValue(&'a str), // escape commas, equals, spaces
17+
FieldKey(&'a str), // escape commas, equals, spaces
18+
FieldValue(&'a Type), // escape quotes, backslashes + quote
19+
}
20+
21+
impl LineProtoTerm<'_> {
22+
pub fn escape(self) -> String {
23+
use LineProtoTerm::*;
24+
match self {
25+
Measurement(x) => Self::escape_any(x, &*COMMAS_SPACES),
26+
TagKey(x) | TagValue(x) | FieldKey(x) => Self::escape_any(x, &*COMMAS_SPACES_EQUALS),
27+
FieldValue(x) => Self::escape_field_value(x),
28+
}
29+
}
30+
31+
fn escape_field_value(v: &Type) -> String {
32+
use Type::*;
33+
match v {
34+
Boolean(v) => {
35+
if *v {
36+
"true"
37+
} else {
38+
"false"
39+
}
40+
}
41+
.to_string(),
42+
Float(v) => v.to_string(),
43+
SignedInteger(v) => format!("{}i", v),
44+
UnsignedInteger(v) => format!("{}i", v),
45+
Text(v) => format!("\"{}\"", Self::escape_any(v, &*QUOTES_SLASHES)),
46+
}
47+
}
48+
49+
fn escape_any(s: &str, re: &Regex) -> String {
50+
re.replace_all(s, r#"\$0"#).to_string()
51+
}
52+
}
53+
54+
#[cfg(test)]
55+
mod test {
56+
use crate::query::line_proto_term::LineProtoTerm::*;
57+
use crate::Type;
58+
59+
#[test]
60+
fn test() {
61+
assert_eq!(Measurement(r#"wea", ther"#).escape(), r#"wea"\,\ ther"#);
62+
assert_eq!(TagKey(r#"locat\ ,=ion"#).escape(), r#"locat\\ \,\=ion"#);
63+
64+
assert_eq!(FieldValue(&Type::Boolean(true)).escape(), r#"true"#);
65+
assert_eq!(FieldValue(&Type::Boolean(false)).escape(), r#"false"#);
66+
67+
assert_eq!(FieldValue(&Type::Float(0.0)).escape(), r#"0"#);
68+
assert_eq!(FieldValue(&Type::Float(-0.1)).escape(), r#"-0.1"#);
69+
70+
assert_eq!(FieldValue(&Type::SignedInteger(0)).escape(), r#"0i"#);
71+
assert_eq!(FieldValue(&Type::SignedInteger(83)).escape(), r#"83i"#);
72+
73+
assert_eq!(FieldValue(&Type::Text("".into())).escape(), r#""""#);
74+
assert_eq!(FieldValue(&Type::Text("0".into())).escape(), r#""0""#);
75+
assert_eq!(FieldValue(&Type::Text("\"".into())).escape(), r#""\"""#);
76+
assert_eq!(
77+
FieldValue(&Type::Text(r#"locat"\ ,=ion"#.into())).escape(),
78+
r#""locat\"\\ ,=ion""#
79+
);
80+
}
81+
82+
#[test]
83+
fn test_empty_tag_value() {
84+
// InfluxDB doesn't support empty tag values. But that's a job
85+
// of a calling site to validate an entire write request.
86+
assert_eq!(TagValue("").escape(), "");
87+
}
88+
}

Diff for: src/query/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::convert::TryInto;
2929

3030
#[cfg(feature = "chrono_timestamps")]
3131
pub mod consts;
32+
mod line_proto_term;
3233
pub mod read_query;
3334
pub mod write_query;
3435
use std::fmt;

Diff for: src/query/write_query.rs

+44-13
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,26 @@
22
//!
33
//! Can only be instantiated by using Query::write_query
44
5+
use crate::query::line_proto_term::LineProtoTerm;
56
use crate::query::{QueryType, ValidQuery};
67
use crate::{Error, Query, Timestamp};
78
use std::fmt::{Display, Formatter};
89

910
// todo: batch write queries
1011

1112
pub trait WriteField {
12-
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>);
13+
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>);
1314
}
1415

1516
impl<T: Into<Type>> WriteField for T {
16-
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>) {
17+
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>) {
1718
let val: Type = self.into();
18-
fields.push((tag, val.to_string()));
19+
fields.push((tag, val));
1920
}
2021
}
2122

2223
impl<T: Into<Type>> WriteField for Option<T> {
23-
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>) {
24+
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>) {
2425
if let Some(val) = self {
2526
val.add_to_fields(tag, fields);
2627
}
@@ -29,7 +30,7 @@ impl<T: Into<Type>> WriteField for Option<T> {
2930

3031
/// Internal Representation of a Write query that has not yet been built
3132
pub struct WriteQuery {
32-
fields: Vec<(String, String)>,
33+
fields: Vec<(String, Type)>,
3334
tags: Vec<(String, String)>,
3435
measurement: String,
3536
timestamp: Timestamp,
@@ -121,7 +122,7 @@ impl Display for Type {
121122
Float(x) => write!(f, "{}", x),
122123
SignedInteger(x) => write!(f, "{}", x),
123124
UnsignedInteger(x) => write!(f, "{}", x),
124-
Text(text) => write!(f, "\"{text}\"", text = text),
125+
Text(text) => write!(f, "{text}", text = text),
125126
}
126127
}
127128
}
@@ -159,22 +160,35 @@ impl Query for WriteQuery {
159160
let mut tags = self
160161
.tags
161162
.iter()
162-
.map(|(tag, value)| format!("{tag}={value}", tag = tag, value = value))
163+
.map(|(tag, value)| {
164+
format!(
165+
"{tag}={value}",
166+
tag = LineProtoTerm::TagKey(tag).escape(),
167+
value = LineProtoTerm::TagValue(value).escape(),
168+
)
169+
})
163170
.collect::<Vec<String>>()
164171
.join(",");
172+
165173
if !tags.is_empty() {
166174
tags.insert_str(0, ",");
167175
}
168176
let fields = self
169177
.fields
170178
.iter()
171-
.map(|(field, value)| format!("{field}={value}", field = field, value = value))
179+
.map(|(field, value)| {
180+
format!(
181+
"{field}={value}",
182+
field = LineProtoTerm::FieldKey(field).escape(),
183+
value = LineProtoTerm::FieldValue(value).escape(),
184+
)
185+
})
172186
.collect::<Vec<String>>()
173187
.join(",");
174188

175189
Ok(ValidQuery(format!(
176190
"{measurement}{tags} {fields}{time}",
177-
measurement = self.measurement,
191+
measurement = LineProtoTerm::Measurement(&self.measurement).escape(),
178192
tags = tags,
179193
fields = fields,
180194
time = match self.timestamp {
@@ -207,7 +221,7 @@ mod tests {
207221
.build();
208222

209223
assert!(query.is_ok(), "Query was empty");
210-
assert_eq!(query.unwrap(), "weather temperature=82 11");
224+
assert_eq!(query.unwrap(), "weather temperature=82i 11");
211225
}
212226

213227
#[test]
@@ -220,7 +234,7 @@ mod tests {
220234
assert!(query.is_ok(), "Query was empty");
221235
assert_eq!(
222236
query.unwrap(),
223-
"weather temperature=82,wind_strength=3.7 11"
237+
"weather temperature=82i,wind_strength=3.7 11"
224238
);
225239
}
226240

@@ -232,7 +246,7 @@ mod tests {
232246
.build();
233247

234248
assert!(query.is_ok(), "Query was empty");
235-
assert_eq!(query.unwrap(), "weather temperature=82 11");
249+
assert_eq!(query.unwrap(), "weather temperature=82i 11");
236250
}
237251

238252
#[test]
@@ -255,7 +269,7 @@ mod tests {
255269
assert!(query.is_ok(), "Query was empty");
256270
assert_eq!(
257271
query.unwrap(),
258-
"weather,location=\"us-midwest\",season=\"summer\" temperature=82 11"
272+
"weather,location=us-midwest,season=summer temperature=82i 11"
259273
);
260274
}
261275

@@ -270,4 +284,21 @@ mod tests {
270284

271285
assert_eq!(query.get_type(), QueryType::WriteQuery);
272286
}
287+
288+
#[test]
289+
fn test_escaping() {
290+
let query = Query::write_query(Timestamp::Hours(11), "wea, ther=")
291+
.add_field("temperature", 82)
292+
.add_field("\"temp=era,t ure\"", r#"too"\\hot"#)
293+
.add_field("float", 82.0)
294+
.add_tag("location", "us-midwest")
295+
.add_tag("loc, =\"ation", "us, \"mid=west\"")
296+
.build();
297+
298+
assert!(query.is_ok(), "Query was empty");
299+
assert_eq!(
300+
query.unwrap().get(),
301+
r#"wea\,\ ther=,location=us-midwest,loc\,\ \="ation=us\,\ "mid\=west" temperature=82i,"temp\=era\,t\ ure"="too\"\\\\hot",float=82 11"#
302+
);
303+
}
273304
}

0 commit comments

Comments
 (0)