Skip to content

Commit 74ed8e1

Browse files
authored
sinks/InfluxDB3: adjust check to work with v2 and v3 (#922)
* adjust check to work with v2 and v3 * small cleanup
1 parent d7e5a33 commit 74ed8e1

File tree

1 file changed

+33
-12
lines changed

1 file changed

+33
-12
lines changed

quixstreams/sinks/core/influxdb3.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import time
44
from datetime import datetime, timezone
55
from typing import Any, Callable, Iterable, Literal, Mapping, Optional, Union, get_args
6+
from urllib.parse import urljoin
7+
8+
import requests
69

710
from quixstreams.models import HeadersTuples
811

@@ -154,6 +157,7 @@ def __init__(
154157
on_client_connect_success=on_client_connect_success,
155158
on_client_connect_failure=on_client_connect_failure,
156159
)
160+
self._request_timeout_ms = request_timeout_ms
157161

158162
if time_precision not in (time_args := get_args(TimePrecision)):
159163
raise ValueError(
@@ -175,7 +179,7 @@ def __init__(
175179
"database": database,
176180
"debug": debug,
177181
"enable_gzip": enable_gzip,
178-
"timeout": request_timeout_ms,
182+
"timeout": self._request_timeout_ms,
179183
"write_client_options": {
180184
"write_options": WriteOptions(
181185
write_type=WriteType.synchronous,
@@ -193,19 +197,36 @@ def __init__(
193197
self._allow_missing_fields = allow_missing_fields
194198
self._convert_ints_to_floats = convert_ints_to_floats
195199

200+
def _get_influx_version(self):
201+
# This validates the token is valid regardless of version
202+
try:
203+
r = requests.get(
204+
urljoin(self._client_args["host"], "ping"),
205+
headers={"Authorization": f"Token {self._client_args['token']}"},
206+
timeout=self._request_timeout_ms / 1000,
207+
)
208+
r.raise_for_status()
209+
except Exception:
210+
logger.error("Ping to InfluxDB failed, likely due to an invalid token.")
211+
raise
212+
version = r.headers.get("X-Influxdb-Version") or r.json()["version"]
213+
return version.split(".")[0][-1]
214+
196215
def setup(self):
197216
self._client = InfluxDBClient3(**self._client_args)
198-
try:
199-
# We cannot safely parameterize the table (measurement) selection, so
200-
# the best we can do is confirm authentication was successful
201-
self._client.query("")
202-
except Exception as e:
203-
e_str = str(e)
204-
if not (
205-
"No SQL statements were provided in the query string" in e_str
206-
or "database not found" in e_str # attempts making db when writing
207-
):
208-
raise
217+
if (version := self._get_influx_version()) == "3":
218+
# additional client functionality/connectivity check with v3
219+
# write and create operations work with v2 due to backwards compatibility
220+
try:
221+
self._client.query("SHOW MEASUREMENTS", language="influxql")
222+
except Exception as e:
223+
if "database not found" not in str(e): # attempts db create on write
224+
raise
225+
else:
226+
logger.warning(
227+
f"connected InfluxDB instance is v{version};"
228+
f"this sink only guarantees functionality with v3."
229+
)
209230

210231
def add(
211232
self,

0 commit comments

Comments
 (0)