Skip to content

Commit 7e01edb

Browse files
authored
fix: async write prec where DEFAULT_PRECISION should not be used (#675)
* fix: (WIP) issue 669 write precision to default in async API * chore: fix lint issues * docs: update CHANGELOG.md * chore: improve indexing of range
1 parent 28a4a04 commit 7e01edb

File tree

3 files changed

+159
-23
lines changed

3 files changed

+159
-23
lines changed

CHANGELOG.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
## 1.47.0 [unreleased]
22

33
### Bug Fixes
4-
1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Add type validation to url attribute in client object
5-
1. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py
4+
5+
1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Adding type validation to url attribute in client object
6+
2. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py
7+
3. [#675](https://github.com/influxdata/influxdb-client-python/pull/675): Ensures WritePrecision in Point is preferred to `DEFAULT_PRECISION`
68

79
## 1.46.0 [2024-09-13]
810

influxdb_client/client/write_api_async.py

+18-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Collect and async write time series data to InfluxDB Cloud or InfluxDB OSS."""
22
import logging
3+
from asyncio import ensure_future, gather
34
from collections import defaultdict
45
from typing import Union, Iterable, NamedTuple
56

@@ -114,12 +115,20 @@ async def write(self, bucket: str, org: str = None,
114115
self._append_default_tags(record)
115116

116117
payloads = defaultdict(list)
117-
self._serialize(record, write_precision, payloads, precision_from_point=False, **kwargs)
118-
119-
# joint list by \n
120-
body = b'\n'.join(payloads[write_precision])
121-
response = await self._write_service.post_write_async(org=org, bucket=bucket, body=body,
122-
precision=write_precision, async_req=False,
123-
_return_http_data_only=False,
124-
content_type="text/plain; charset=utf-8")
125-
return response[1] in (201, 204)
118+
self._serialize(record, write_precision, payloads, precision_from_point=True, **kwargs)
119+
120+
futures = []
121+
for payload_precision, payload_line in payloads.items():
122+
futures.append(ensure_future
123+
(self._write_service.post_write_async(org=org, bucket=bucket,
124+
body=b'\n'.join(payload_line),
125+
precision=payload_precision, async_req=False,
126+
_return_http_data_only=False,
127+
content_type="text/plain; charset=utf-8")))
128+
129+
results = await gather(*futures, return_exceptions=True)
130+
for result in results:
131+
if isinstance(result, Exception):
132+
raise result
133+
134+
return False not in [re[1] in (201, 204) for re in results]

tests/test_InfluxDBClientAsync.py

+137-12
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import asyncio
2+
import dateutil.parser
23
import logging
4+
import math
35
import re
6+
import time
47
import unittest
58
import os
69
from datetime import datetime, timezone
710
from io import StringIO
811

12+
import pandas
913
import pytest
1014
import warnings
1115
from aioresponses import aioresponses
@@ -199,30 +203,151 @@ async def test_write_empty_data(self):
199203

200204
self.assertEqual(True, response)
201205

206+
def gen_fractional_utc(self, nano, precision) -> str:
207+
raw_sec = nano / 1_000_000_000
208+
if precision == WritePrecision.NS:
209+
rem = f"{nano % 1_000_000_000}".rjust(9,"0").rstrip("0")
210+
return (datetime.fromtimestamp(math.floor(raw_sec), tz=timezone.utc)
211+
.isoformat()
212+
.replace("+00:00", "") + f".{rem}Z")
213+
#f".{rem}Z"))
214+
elif precision == WritePrecision.US:
215+
# rem = f"{round(nano / 1_000) % 1_000_000}"#.ljust(6,"0")
216+
return (datetime.fromtimestamp(round(raw_sec,6), tz=timezone.utc)
217+
.isoformat()
218+
.replace("+00:00","")
219+
.strip("0") + "Z"
220+
)
221+
elif precision == WritePrecision.MS:
222+
#rem = f"{round(nano / 1_000_000) % 1_000}".rjust(3, "0")
223+
return (datetime.fromtimestamp(round(raw_sec,3), tz=timezone.utc)
224+
.isoformat()
225+
.replace("+00:00","")
226+
.strip("0") + "Z"
227+
)
228+
elif precision == WritePrecision.S:
229+
return (datetime.fromtimestamp(round(raw_sec), tz=timezone.utc)
230+
.isoformat()
231+
.replace("+00:00","Z"))
232+
else:
233+
raise ValueError(f"Unknown precision: {precision}")
234+
235+
202236
@async_test
203237
async def test_write_points_different_precision(self):
238+
now_ns = time.time_ns()
239+
now_us = now_ns / 1_000
240+
now_ms = now_us / 1_000
241+
now_s = now_ms / 1_000
242+
243+
now_date_s = self.gen_fractional_utc(now_ns, WritePrecision.S)
244+
now_date_ms = self.gen_fractional_utc(now_ns, WritePrecision.MS)
245+
now_date_us = self.gen_fractional_utc(now_ns, WritePrecision.US)
246+
now_date_ns = self.gen_fractional_utc(now_ns, WritePrecision.NS)
247+
248+
points = {
249+
WritePrecision.S: [],
250+
WritePrecision.MS: [],
251+
WritePrecision.US: [],
252+
WritePrecision.NS: []
253+
}
254+
255+
expected = {}
256+
204257
measurement = generate_name("measurement")
205-
_point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) \
206-
.time(datetime.fromtimestamp(0, tz=timezone.utc), write_precision=WritePrecision.S)
207-
_point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3) \
208-
.time(datetime.fromtimestamp(1, tz=timezone.utc), write_precision=WritePrecision.MS)
209-
_point3 = Point(measurement).tag("location", "Berlin").field("temperature", 24.3) \
210-
.time(datetime.fromtimestamp(2, tz=timezone.utc), write_precision=WritePrecision.NS)
211-
await self.client.write_api().write(bucket="my-bucket", record=[_point1, _point2, _point3],
258+
# basic date-time value
259+
points[WritePrecision.S].append(Point(measurement).tag("method", "SecDateTime").field("temperature", 25.3) \
260+
.time(datetime.fromtimestamp(round(now_s), tz=timezone.utc), write_precision=WritePrecision.S))
261+
expected['SecDateTime'] = now_date_s
262+
points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDateTime").field("temperature", 24.3) \
263+
.time(datetime.fromtimestamp(round(now_s,3), tz=timezone.utc), write_precision=WritePrecision.MS))
264+
expected['MilDateTime'] = now_date_ms
265+
points[WritePrecision.US].append(Point(measurement).tag("method", "MicDateTime").field("temperature", 24.3) \
266+
.time(datetime.fromtimestamp(round(now_s,6), tz=timezone.utc), write_precision=WritePrecision.US))
267+
expected['MicDateTime'] = now_date_us
268+
# N.B. datetime does not handle nanoseconds
269+
# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDateTime").field("temperature", 24.3) \
270+
# .time(datetime.fromtimestamp(now_s, tz=timezone.utc), write_precision=WritePrecision.NS))
271+
272+
# long timestamps based on POSIX time
273+
points[WritePrecision.S].append(Point(measurement).tag("method", "SecPosix").field("temperature", 24.3) \
274+
.time(round(now_s), write_precision=WritePrecision.S))
275+
expected['SecPosix'] = now_date_s
276+
points[WritePrecision.MS].append(Point(measurement).tag("method", "MilPosix").field("temperature", 24.3) \
277+
.time(round(now_ms), write_precision=WritePrecision.MS))
278+
expected['MilPosix'] = now_date_ms
279+
points[WritePrecision.US].append(Point(measurement).tag("method", "MicPosix").field("temperature", 24.3) \
280+
.time(round(now_us), write_precision=WritePrecision.US))
281+
expected['MicPosix'] = now_date_us
282+
points[WritePrecision.NS].append(Point(measurement).tag("method", "NanPosix").field("temperature", 24.3) \
283+
.time(now_ns, write_precision=WritePrecision.NS))
284+
expected['NanPosix'] = now_date_ns
285+
286+
# ISO Zulu datetime with ms, us and ns e.g. "2024-09-27T13:17:16.412399728Z"
287+
points[WritePrecision.S].append(Point(measurement).tag("method", "SecDTZulu").field("temperature", 24.3) \
288+
.time(now_date_s, write_precision=WritePrecision.S))
289+
expected['SecDTZulu'] = now_date_s
290+
points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDTZulu").field("temperature", 24.3) \
291+
.time(now_date_ms, write_precision=WritePrecision.MS))
292+
expected['MilDTZulu'] = now_date_ms
293+
points[WritePrecision.US].append(Point(measurement).tag("method", "MicDTZulu").field("temperature", 24.3) \
294+
.time(now_date_us, write_precision=WritePrecision.US))
295+
expected['MicDTZulu'] = now_date_us
296+
# This keeps resulting in micro second resolution in response
297+
# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDTZulu").field("temperature", 24.3) \
298+
# .time(now_date_ns, write_precision=WritePrecision.NS))
299+
300+
recs = [x for x in [v for v in points.values()]]
301+
302+
await self.client.write_api().write(bucket="my-bucket", record=recs,
212303
write_precision=WritePrecision.NS)
213304
query = f'''
214305
from(bucket:"my-bucket")
215306
|> range(start: 0)
216307
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
217-
|> keep(columns: ["_time"])
308+
|> keep(columns: ["method","_time"])
218309
'''
219310
query_api = self.client.query_api()
220311

312+
# ensure calls fully processed on server
313+
await asyncio.sleep(1)
314+
221315
raw = await query_api.query_raw(query)
222-
self.assertEqual(8, len(raw.splitlines()))
223-
self.assertEqual(',,0,1970-01-01T00:00:02Z', raw.splitlines()[4])
224-
self.assertEqual(',,0,1970-01-01T00:00:01Z', raw.splitlines()[5])
225-
self.assertEqual(',,0,1970-01-01T00:00:00Z', raw.splitlines()[6])
316+
linesRaw = raw.splitlines()[4:]
317+
318+
lines = []
319+
for lnr in linesRaw:
320+
lines.append(lnr[2:].split(","))
321+
322+
def get_time_for_method(lines, method):
323+
for l in lines:
324+
if l[2] == method:
325+
return l[1]
326+
return ""
327+
328+
self.assertEqual(15, len(raw.splitlines()))
329+
330+
for key in expected:
331+
t = get_time_for_method(lines,key)
332+
comp_time = dateutil.parser.isoparse(get_time_for_method(lines,key))
333+
target_time = dateutil.parser.isoparse(expected[key])
334+
self.assertEqual(target_time.date(), comp_time.date())
335+
self.assertEqual(target_time.hour, comp_time.hour)
336+
self.assertEqual(target_time.second,comp_time.second)
337+
dif = abs(target_time.microsecond - comp_time.microsecond)
338+
if key[:3] == "Sec":
339+
# Already tested
340+
pass
341+
elif key[:3] == "Mil":
342+
# may be slight rounding differences
343+
self.assertLess(dif, 1500, f"failed to match timestamp for {key} {target_time} != {comp_time}")
344+
elif key[:3] == "Mic":
345+
# may be slight rounding differences
346+
self.assertLess(dif, 150, f"failed to match timestamp for {key} {target_time} != {comp_time}")
347+
elif key[:3] == "Nan":
348+
self.assertEqual(expected[key], get_time_for_method(lines, key))
349+
else:
350+
raise Exception(f"Unhandled key {key}")
226351

227352
@async_test
228353
async def test_delete_api(self):

0 commit comments

Comments
 (0)