Skip to content

Commit 7ad9533

Browse files
authored
feat: add headers field to InfluxDBError and add example of use (#665)
1 parent 45e6607 commit 7ad9533

File tree

6 files changed

+186
-0
lines changed

6 files changed

+186
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### Examples:
44
1. [#664](https://github.com/influxdata/influxdb-client-python/pull/664/): Multiprocessing example uses new source of data
5+
1. [#665](https://github.com/influxdata/influxdb-client-python/pull/665): Shows how to leverage header fields in errors returned on write.
56

67
## 1.45.0 [2024-08-12]
78

examples/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
- manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
1616
- install Apache Arrow `pip install pyarrow` dependency
1717
- [write_batching_by_bytes_count.py](write_batching_by_bytes_count.py) - How to use RxPY to prepare batches by maximum bytes count.
18+
- [http_error_handling.py](http_error_handling.py) - How to leverage HttpHeader information when errors are returned on write.
1819

1920
## Queries
2021
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`

examples/http_error_handling.py

+126
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
"""
2+
Illustrates getting header values from Errors that may occur on write.
3+
4+
To test against cloud set the following environment variables:
5+
INFLUX_URL
6+
INFLUX_TOKEN
7+
INFLUX_DATABASE
8+
INFLUX_ORG
9+
10+
...otherwise will run against a standard OSS endpoint.
11+
"""
12+
import asyncio
13+
import os
14+
from typing import MutableMapping
15+
16+
from influxdb_client import InfluxDBClient
17+
from influxdb_client.client.exceptions import InfluxDBError
18+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
19+
from influxdb_client.client.write_api import SYNCHRONOUS
20+
from influxdb_client.rest import ApiException
21+
22+
23+
def get_envar(key, default):
24+
try:
25+
return os.environ[key]
26+
except:
27+
return default
28+
29+
30+
class Config(object):
31+
32+
def __init__(self):
33+
self.url = get_envar("INFLUX_URL", "http://localhost:8086")
34+
self.token = get_envar("INFLUX_TOKEN", "my-token")
35+
self.bucket = get_envar("INFLUX_DATABASE", "my-bucket")
36+
self.org = get_envar("INFLUX_ORG", "my-org")
37+
38+
def __str__(self):
39+
return (f"config:\n"
40+
f" url: {self.url}\n"
41+
f" token: ****redacted*****\n"
42+
f" bucket: {self.bucket}\n"
43+
f" org: {self.org}\n"
44+
)
45+
46+
47+
# To encapsulate functions used in batch writing
48+
class BatchCB(object):
49+
50+
def success(self, conf: (str, str, str), data: str):
51+
print(f"Write success: {conf}, data: {data}")
52+
53+
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
54+
print(f"\nBatch -> Write failed: {conf}, data: {data}, error: {exception.message}")
55+
report_headers(exception.headers)
56+
57+
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
58+
print(f"Write failed but retryable: {conf}, data: {data}, error: {exception}")
59+
60+
61+
# simple reporter that server is available
62+
def report_ping(ping: bool):
63+
if not ping:
64+
raise ValueError("InfluxDB: Failed to ping server")
65+
else:
66+
print("InfluxDB: ready")
67+
68+
69+
# report some useful expected header fields
70+
def report_headers(headers: MutableMapping[str, str]):
71+
print(" Date: ", headers.get("Date"))
72+
print(" X-Influxdb-Build: ", headers.get("X-Influxdb-Build"))
73+
print(" X-Influxdb-Version: ", headers.get("X-Influxdb-Version")) # OSS version, Cloud should be None
74+
print(" X-Platform-Error-Code: ", headers.get("X-Platform-Error-Code")) # OSS invalid, Cloud should be None
75+
print(" Retry-After: ", headers.get("Retry-After")) # Should be None
76+
print(" Trace-Id: ", headers.get("Trace-Id")) # OSS should be None, Cloud should return value
77+
78+
79+
# try a write using a synchronous call
80+
def use_sync(conf: Config):
81+
print("Using sync")
82+
with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client:
83+
report_ping(client.ping())
84+
try:
85+
client.write_api(write_options=SYNCHRONOUS).write(bucket=conf.bucket, record="cpu,location=G4 usage=")
86+
except ApiException as ae:
87+
print("\nSync -> Caught ApiException: ", ae.message)
88+
report_headers(ae.headers)
89+
90+
print("Sync write done")
91+
92+
93+
# try a write using batch API
94+
def use_batch(conf: Config):
95+
print("Using batch")
96+
with InfluxDBClient(url=conf.url, token=conf.token, org=conf.org) as client:
97+
cb = BatchCB()
98+
with client.write_api(success_callback=cb.success,
99+
error_callback=cb.error,
100+
retry_callback=cb.retry) as write_api:
101+
write_api.write(bucket=conf.bucket, record="cpu,location=G9 usage=")
102+
print("Batch write sent")
103+
print("Batch write done")
104+
105+
106+
# try a write using async.io
107+
async def use_async(conf: Config):
108+
print("Using async")
109+
async with InfluxDBClientAsync(url=conf.url, token=conf.token, org=conf.org) as client:
110+
report_ping(await client.ping())
111+
try:
112+
await client.write_api().write(bucket=conf.bucket, record="cpu,location=G7 usage=")
113+
except InfluxDBError as ie:
114+
print("\nAsync -> Caught InfluxDBError: ", ie.message)
115+
report_headers(ie.headers)
116+
print("Async write done")
117+
118+
119+
if __name__ == "__main__":
120+
conf = Config()
121+
print(conf)
122+
use_sync(conf)
123+
print("\n Continuing...\n")
124+
use_batch(conf)
125+
print("\n Continuing...\n")
126+
asyncio.run(use_async(conf))

influxdb_client/client/exceptions.py

+2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ def __init__(self, response: HTTPResponse = None, message: str = None):
1616
self.response = response
1717
self.message = self._get_message(response)
1818
if isinstance(response, HTTPResponse): # response is HTTPResponse
19+
self.headers = response.headers
1920
self.retry_after = response.headers.get('Retry-After')
2021
else: # response is RESTResponse
22+
self.headers = response.getheaders()
2123
self.retry_after = response.getheader('Retry-After')
2224
else:
2325
self.response = None

tests/test_InfluxDBClientAsync.py

+19
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import re
34
import unittest
45
import os
56
from datetime import datetime, timezone
@@ -390,6 +391,24 @@ async def test_query_exception_propagation(self):
390391
await self.client.query_api().query("buckets()", "my-org")
391392
self.assertEqual("unauthorized access", e.value.message)
392393

394+
@async_test
395+
async def test_write_exception_propagation(self):
396+
await self.client.close()
397+
self.client = InfluxDBClientAsync(url="http://localhost:8086", token="wrong", org="my-org")
398+
399+
with pytest.raises(InfluxDBError) as e:
400+
await self.client.write_api().write(bucket="my_bucket",
401+
record="temperature,location=hic cels=")
402+
self.assertEqual("unauthorized access", e.value.message)
403+
headers = e.value.headers
404+
self.assertIsNotNone(headers)
405+
self.assertIsNotNone(headers.get("Content-Length"))
406+
self.assertIsNotNone(headers.get("Date"))
407+
self.assertIsNotNone(headers.get("X-Platform-Error-Code"))
408+
self.assertIn("application/json", headers.get("Content-Type"))
409+
self.assertTrue(re.compile("^v.*").match(headers.get("X-Influxdb-Version")))
410+
self.assertEqual("OSS", headers.get("X-Influxdb-Build"))
411+
393412
@async_test
394413
@aioresponses()
395414
async def test_parse_utf8_two_bytes_character(self, mocked):

tests/test_WriteApi.py

+37
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33
from __future__ import absolute_import
44

55
import datetime
6+
import json
7+
import logging
68
import os
9+
import re
710
import sys
811
import unittest
912
from collections import namedtuple
1013
from datetime import timedelta
1114
from multiprocessing.pool import ApplyResult
15+
from types import SimpleNamespace
1216

1317
import httpretty
1418
import pytest
@@ -190,6 +194,17 @@ def test_write_error(self):
190194

191195
self.assertEqual(400, exception.status)
192196
self.assertEqual("Bad Request", exception.reason)
197+
# assert headers
198+
self.assertIsNotNone(exception.headers)
199+
self.assertIsNotNone(exception.headers.get("Content-Length"))
200+
self.assertIsNotNone(exception.headers.get("Date"))
201+
self.assertIsNotNone(exception.headers.get("X-Platform-Error-Code"))
202+
self.assertIn("application/json", exception.headers.get("Content-Type"))
203+
self.assertTrue(re.compile("^v.*").match(exception.headers.get("X-Influxdb-Version")))
204+
self.assertEqual("OSS", exception.headers.get("X-Influxdb-Build"))
205+
# assert body
206+
b = json.loads(exception.body, object_hook=lambda d: SimpleNamespace(**d))
207+
self.assertTrue(re.compile("^unable to parse.*invalid field format").match(b.message))
193208

194209
def test_write_dictionary(self):
195210
_bucket = self.create_test_bucket()
@@ -609,6 +624,28 @@ def test_write_result(self):
609624
self.assertEqual(None, result.get())
610625
self.delete_test_bucket(_bucket)
611626

627+
def test_write_error(self):
628+
_bucket = self.create_test_bucket()
629+
630+
_record = "h2o_feet,location=coyote_creek level\\ water_level="
631+
result = self.write_client.write(_bucket.name, self.org, _record)
632+
633+
with self.assertRaises(ApiException) as cm:
634+
result.get()
635+
self.assertEqual(400, cm.exception.status)
636+
self.assertEqual("Bad Request", cm.exception.reason)
637+
# assert headers
638+
self.assertIsNotNone(cm.exception.headers)
639+
self.assertIsNotNone(cm.exception.headers.get("Content-Length"))
640+
self.assertIsNotNone(cm.exception.headers.get("Date"))
641+
self.assertIsNotNone(cm.exception.headers.get("X-Platform-Error-Code"))
642+
self.assertIn("application/json", cm.exception.headers.get("Content-Type"))
643+
self.assertTrue(re.compile("^v.*").match(cm.exception.headers.get("X-Influxdb-Version")))
644+
self.assertEqual("OSS", cm.exception.headers.get("X-Influxdb-Build"))
645+
# assert body
646+
b = json.loads(cm.exception.body, object_hook=lambda d: SimpleNamespace(**d))
647+
self.assertTrue(re.compile("^unable to parse.*missing field value").match(b.message))
648+
612649
def test_write_dictionaries(self):
613650
bucket = self.create_test_bucket()
614651

0 commit comments

Comments
 (0)