Skip to content

Commit 8f4e591

Browse files
authored
Opt-in Line-Protocol for Compatibility Mode (#123)
1 parent 0aface6 commit 8f4e591

File tree

5 files changed

+133
-10
lines changed

5 files changed

+133
-10
lines changed

influxdb/src/client/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl Client {
131131
/// Add authorization token to [`Client`](crate::Client)
132132
///
133133
/// This is designed for influxdb 2.0's backward-compatible API which
134-
/// requires authrozation by default. You can create such token from
134+
/// requires authorization by default. You can create such token from
135135
/// console of influxdb 2.0 .
136136
pub fn with_token<S>(mut self, token: S) -> Self
137137
where

influxdb/src/query/line_proto_term.rs

+25-3
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,22 @@ impl LineProtoTerm<'_> {
2525
match self {
2626
Measurement(x) => Self::escape_any(x, &COMMAS_SPACES),
2727
TagKey(x) | FieldKey(x) => Self::escape_any(x, &COMMAS_SPACES_EQUALS),
28-
FieldValue(x) => Self::escape_field_value(x),
28+
FieldValue(x) => Self::escape_field_value(x, false),
2929
TagValue(x) => Self::escape_tag_value(x),
3030
}
3131
}
3232

33-
fn escape_field_value(v: &Type) -> String {
33+
pub fn escape_v2(self) -> String {
34+
use LineProtoTerm::*;
35+
match self {
36+
Measurement(x) => Self::escape_any(x, &COMMAS_SPACES),
37+
TagKey(x) | FieldKey(x) => Self::escape_any(x, &COMMAS_SPACES_EQUALS),
38+
FieldValue(x) => Self::escape_field_value(x, true),
39+
TagValue(x) => Self::escape_tag_value(x),
40+
}
41+
}
42+
43+
fn escape_field_value(v: &Type, use_v2: bool) -> String {
3444
use Type::*;
3545
match v {
3646
Boolean(v) => {
@@ -43,7 +53,13 @@ impl LineProtoTerm<'_> {
4353
.to_string(),
4454
Float(v) => v.to_string(),
4555
SignedInteger(v) => format!("{}i", v),
46-
UnsignedInteger(v) => format!("{}u", v),
56+
UnsignedInteger(v) => {
57+
if use_v2 {
58+
format!("{}u", v)
59+
} else {
60+
format!("{}i", v)
61+
}
62+
}
4763
Text(v) => format!(r#""{}""#, Self::escape_any(v, &QUOTES_SLASHES)),
4864
}
4965
}
@@ -112,6 +128,12 @@ mod test {
112128
assert_eq!(FieldValue(&Type::SignedInteger(0)).escape(), r#"0i"#);
113129
assert_eq!(FieldValue(&Type::SignedInteger(83)).escape(), r#"83i"#);
114130

131+
assert_eq!(FieldValue(&Type::UnsignedInteger(0)).escape(), r#"0i"#);
132+
assert_eq!(FieldValue(&Type::UnsignedInteger(83)).escape(), r#"83i"#);
133+
134+
assert_eq!(FieldValue(&Type::UnsignedInteger(0)).escape_v2(), r#"0u"#);
135+
assert_eq!(FieldValue(&Type::UnsignedInteger(83)).escape_v2(), r#"83u"#);
136+
115137
assert_eq!(FieldValue(&Type::Text("".into())).escape(), r#""""#);
116138
assert_eq!(FieldValue(&Type::Text("0".into())).escape(), r#""0""#);
117139
assert_eq!(FieldValue(&Type::Text("\"".into())).escape(), r#""\"""#);

influxdb/src/query/mod.rs

+29-1
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,36 @@ pub trait Query {
112112
/// ```
113113
fn build(&self) -> Result<ValidQuery, Error>;
114114

115+
/// Like [build] but with additional support for unsigned integers in the line protocol.
116+
/// Please note, this crate can only interact with InfluxDB 2.0 in compatibility mode
117+
/// and does not natively support InfluxDB 2.0.
118+
///
119+
/// # Examples
120+
///
121+
/// ```rust
122+
/// use influxdb::{Query, Timestamp};
123+
/// use influxdb::InfluxDbWriteable;
124+
///
125+
/// let use_v2 = true;
126+
///
127+
/// let invalid_query = Timestamp::Nanoseconds(0).into_query("measurement").build_with_opts(use_v2);
128+
/// assert!(invalid_query.is_err());
129+
///
130+
/// let valid_query = Timestamp::Nanoseconds(0).into_query("measurement").add_field("myfield1", 11).build_with_opts(use_v2);
131+
/// assert!(valid_query.is_ok());
132+
/// ```
133+
fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error>;
134+
115135
fn get_type(&self) -> QueryType;
116136
}
117137

118138
impl<Q: Query> Query for &Q {
119139
fn build(&self) -> Result<ValidQuery, Error> {
120-
Q::build(self)
140+
Q::build_with_opts(self, false)
141+
}
142+
143+
fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error> {
144+
Q::build_with_opts(self, use_v2)
121145
}
122146

123147
fn get_type(&self) -> QueryType {
@@ -130,6 +154,10 @@ impl<Q: Query> Query for Box<Q> {
130154
Q::build(self)
131155
}
132156

157+
fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error> {
158+
Q::build_with_opts(self, use_v2)
159+
}
160+
133161
fn get_type(&self) -> QueryType {
134162
Q::get_type(self)
135163
}

influxdb/src/query/read_query.rs

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ impl Query for ReadQuery {
3838
Ok(ValidQuery(self.queries.join(";")))
3939
}
4040

41+
fn build_with_opts(&self, _use_v2: bool) -> Result<ValidQuery, Error> {
42+
Ok(ValidQuery(self.queries.join(";")))
43+
}
44+
4145
fn get_type(&self) -> QueryType {
4246
QueryType::ReadQuery
4347
}

influxdb/src/query/write_query.rs

+74-5
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ where
163163

164164
impl Query for WriteQuery {
165165
fn build(&self) -> Result<ValidQuery, Error> {
166+
self.build_with_opts(false)
167+
}
168+
169+
fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error> {
166170
if self.fields.is_empty() {
167171
return Err(Error::InvalidQueryError {
168172
error: "fields cannot be empty".to_string(),
@@ -173,10 +177,20 @@ impl Query for WriteQuery {
173177
.tags
174178
.iter()
175179
.map(|(tag, value)| {
180+
let escaped_tag_key = if use_v2 {
181+
LineProtoTerm::TagKey(tag).escape_v2()
182+
} else {
183+
LineProtoTerm::TagKey(tag).escape()
184+
};
185+
let escaped_tag_value = if use_v2 {
186+
LineProtoTerm::TagValue(value).escape_v2()
187+
} else {
188+
LineProtoTerm::TagValue(value).escape()
189+
};
176190
format!(
177191
"{tag}={value}",
178-
tag = LineProtoTerm::TagKey(tag).escape(),
179-
value = LineProtoTerm::TagValue(value).escape(),
192+
tag = escaped_tag_key,
193+
value = escaped_tag_value,
180194
)
181195
})
182196
.collect::<Vec<String>>()
@@ -189,18 +203,34 @@ impl Query for WriteQuery {
189203
.fields
190204
.iter()
191205
.map(|(field, value)| {
206+
let escaped_field_key = if use_v2 {
207+
LineProtoTerm::FieldKey(field).escape_v2()
208+
} else {
209+
LineProtoTerm::FieldKey(field).escape()
210+
};
211+
let escaped_field_value = if use_v2 {
212+
LineProtoTerm::FieldValue(value).escape_v2()
213+
} else {
214+
LineProtoTerm::FieldValue(value).escape()
215+
};
192216
format!(
193217
"{field}={value}",
194-
field = LineProtoTerm::FieldKey(field).escape(),
195-
value = LineProtoTerm::FieldValue(value).escape(),
218+
field = escaped_field_key,
219+
value = escaped_field_value,
196220
)
197221
})
198222
.collect::<Vec<String>>()
199223
.join(",");
200224

225+
let escaped_measurement = if use_v2 {
226+
LineProtoTerm::Measurement(&self.measurement).escape_v2()
227+
} else {
228+
LineProtoTerm::Measurement(&self.measurement).escape()
229+
};
230+
201231
Ok(ValidQuery(format!(
202232
"{measurement}{tags} {fields} {time}",
203-
measurement = LineProtoTerm::Measurement(&self.measurement).escape(),
233+
measurement = escaped_measurement,
204234
tags = tags,
205235
fields = fields,
206236
time = self.timestamp
@@ -224,6 +254,17 @@ impl Query for Vec<WriteQuery> {
224254
Ok(ValidQuery(qlines.join("\n")))
225255
}
226256

257+
fn build_with_opts(&self, use_v2: bool) -> Result<ValidQuery, Error> {
258+
let mut qlines = Vec::new();
259+
260+
for q in self {
261+
let valid_query = q.build_with_opts(use_v2)?;
262+
qlines.push(valid_query.0);
263+
}
264+
265+
Ok(ValidQuery(qlines.join("\n")))
266+
}
267+
227268
fn get_type(&self) -> QueryType {
228269
QueryType::WriteQuery(
229270
self.get(0)
@@ -267,6 +308,22 @@ mod tests {
267308
.add_field("temperature_unsigned", 82u64)
268309
.build();
269310

311+
assert!(query.is_ok(), "Query was empty");
312+
assert_eq!(
313+
query.unwrap(),
314+
"weather temperature=82i,wind_strength=3.7,temperature_unsigned=82i 11"
315+
);
316+
}
317+
318+
#[test]
319+
fn test_write_builder_multiple_fields_with_v2() {
320+
let query = Timestamp::Hours(11)
321+
.into_query("weather".to_string())
322+
.add_field("temperature", 82)
323+
.add_field("wind_strength", 3.7)
324+
.add_field("temperature_unsigned", 82u64)
325+
.build_with_opts(true);
326+
270327
assert!(query.is_ok(), "Query was empty");
271328
assert_eq!(
272329
query.unwrap(),
@@ -282,6 +339,18 @@ mod tests {
282339
.add_tag("wind_strength", <Option<u64>>::None)
283340
.build();
284341

342+
assert!(query.is_ok(), "Query was empty");
343+
assert_eq!(query.unwrap(), "weather temperature=82i 11");
344+
}
345+
346+
#[test]
347+
fn test_write_builder_optional_fields_with_v2() {
348+
let query = Timestamp::Hours(11)
349+
.into_query("weather".to_string())
350+
.add_field("temperature", 82u64)
351+
.add_tag("wind_strength", <Option<u64>>::None)
352+
.build_with_opts(true);
353+
285354
assert!(query.is_ok(), "Query was empty");
286355
assert_eq!(query.unwrap(), "weather temperature=82u 11");
287356
}

0 commit comments

Comments
 (0)