Skip to content

Commit 6fa5e33

Browse files
authored
Improve type handling on adding values to timeseries builder as well (#109)
Timeseries builder fixes: - Add automatic type conversion, now supporting any number type (numpy.int64 for example) and not just built-in python types - Null values are not throwing an exception anymore. They are just ignored
1 parent d7ac116 commit 6fa5e33

File tree

3 files changed

+60
-30
lines changed

3 files changed

+60
-30
lines changed

src/PythonClient/src/quixstreams/builders/timeseriesdatabuilder.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import ctypes
2+
import numbers
23
from typing import Union, Dict
34

45
from ..helpers.nativedecorator import nativedecorator
@@ -27,26 +28,25 @@ def __init__(self, net_pointer: ctypes.c_void_p):
2728
def __enter__(self):
2829
self._entered = True
2930

30-
def add_value(self, parameter_id: str, value: Union[str, float, int, bytes, bytearray]) -> 'TimeseriesDataBuilder':
31+
def add_value(self, parameter_id: str, value: Union[numbers.Number, str, bytearray, bytes]) -> 'TimeseriesDataBuilder':
3132
"""
3233
Adds new parameter value at the time the builder is created for.
3334
3435
Args:
3536
parameter_id: The id of the parameter to set the value for.
36-
value: The value of type string, float, int, bytes, or bytearray.
37+
value: The value to add. Can be a number, string, bytearray, or bytes.
3738
3839
Returns:
3940
The builder.
4041
"""
4142

42-
val_type = type(value)
43-
if val_type is int:
43+
if value is None:
44+
return self
45+
46+
if issubclass(type(value), numbers.Number):
4447
value = float(value)
45-
val_type = float
46-
elif val_type is bytearray:
47-
value = bytes(value)
48-
val_type = bytes
4948

49+
val_type = type(value)
5050
if val_type is float:
5151
new = tsdbi(self._interop.AddValue(parameter_id, value))
5252
if new != self._interop:
@@ -57,7 +57,7 @@ def add_value(self, parameter_id: str, value: Union[str, float, int, bytes, byte
5757
if new != self._interop:
5858
self._interop.dispose_ptr__()
5959
self._interop = new
60-
elif val_type is bytes:
60+
elif val_type is bytes or val_type is bytearray:
6161
arr_ptr = Array.WriteBytes(value)
6262
new = tsdbi(self._interop.AddValue3(parameter_id, arr_ptr))
6363
if new != self._interop:

src/PythonClient/tests/quixstreams/integrationtests/test_integration.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import unittest
44
import threading
55
import pandas as pd
6+
import numpy as np
67
from src.quixstreams import Logging, LogLevel, AutoOffsetReset
78

89
from testcontainers.core.container import DockerContainer
@@ -772,7 +773,7 @@ def on_stream_closed(stream: qx.StreamConsumer, end_type: qx.StreamEndType):
772773
print(f"---- Write second stream {output_stream.stream_id} ----")
773774

774775
# Assert
775-
self.waitforresult(event)
776+
self.waitforresult(event, 30)
776777

777778
self.assertEqual(last_stream_read.stream_id, output_stream.stream_id)
778779

@@ -908,6 +909,23 @@ def test_get_or_create_stream_with_prev_stream_without_callback(self):
908909
# endregion
909910

910911
# region timeseries data integration tests
912+
def test_timeseries_builder_works_with_any_number_type_and_none(self):
913+
# Arrange
914+
print("Starting Integration test {}".format(sys._getframe().f_code.co_name))
915+
topic_name = sys._getframe().f_code.co_name # current method name
916+
stream = qx.KafkaStreamingClient(TestIntegration.broker_list, None).get_topic_producer(topic_name).create_stream()
917+
918+
# Act
919+
stream.timeseries.buffer \
920+
.add_timestamp_nanoseconds(1) \
921+
.add_value("npy_float64", np.float64(42.0)) \
922+
.add_value("npy_int64", np.int64(42)) \
923+
.add_value("native_int", int(42)) \
924+
.add_value("native_float", float(42)) \
925+
.add_value("none", None)
926+
927+
# Assert that no exception got raised
928+
911929
def test_parameters_write_binary_read_binary_is_of_bytes(self):
912930
# Arrange
913931
print("Starting Integration test {}".format(sys._getframe().f_code.co_name))

src/PythonClient/tests/quixstreams/unittests/models/test_timeseriesdatatimestamp.py

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,56 +8,68 @@ class TimeseriesDataTimestampTests(unittest.TestCase):
88

99
def test_add_double(self):
1010
# Arrange
11-
pd = TimeseriesData()
11+
td = TimeseriesData()
1212

1313
# Act
14-
pd.add_timestamp_nanoseconds(100) \
14+
td.add_timestamp_nanoseconds(100) \
1515
.add_value("double", 1.232)
1616

1717
# Assert
18-
self.assertEqual(1.232, pd.timestamps[0].parameters["double"].numeric_value)
18+
self.assertEqual(1.232, td.timestamps[0].parameters["double"].numeric_value)
1919

2020
def test_add_string(self):
2121
# Arrange
22-
pd = TimeseriesData()
22+
td = TimeseriesData()
2323

2424
# Act
25-
pd.add_timestamp_nanoseconds(100) \
25+
td.add_timestamp_nanoseconds(100) \
2626
.add_value("str", "value")
2727

2828
# Assert
29-
asdf = pd.timestamps[0]
29+
asdf = td.timestamps[0]
3030
param = asdf.parameters
31-
self.assertEqual(pd.timestamps[0].parameters["str"].string_value, "value")
31+
self.assertEqual(td.timestamps[0].parameters["str"].string_value, "value")
3232

3333
def test_add_bytes(self):
3434
# Arrange
35-
pd = TimeseriesData()
35+
td = TimeseriesData()
3636
expected = bytes("some bytes", "utf-8")
3737

3838
# Act
39-
pd.add_timestamp_nanoseconds(100) \
39+
td.add_timestamp_nanoseconds(100) \
4040
.add_value("bytes", expected)
4141

4242
# Assert
43-
self.assertEqual(expected, pd.timestamps[0].parameters["bytes"].binary_value)
43+
self.assertEqual(expected, td.timestamps[0].parameters["bytes"].binary_value)
44+
45+
def test_add_bytearray(self):
46+
# Arrange
47+
td = TimeseriesData()
48+
expected = bytearray("some bytes", "utf-8")
49+
50+
# Act
51+
td.add_timestamp_nanoseconds(100) \
52+
.add_value("bytearray", expected)
53+
54+
# Assert
55+
self.assertEqual(expected, td.timestamps[0].parameters["bytearray"].binary_value)
4456

4557
def test_add_tag(self):
4658
# Arrange
47-
pd = TimeseriesData()
48-
pd.add_timestamp_nanoseconds(100)
49-
pdts = pd.timestamps[0]
50-
pdts.add_tag("a", "b")
59+
td = TimeseriesData()
60+
td.add_timestamp_nanoseconds(100)
61+
tdts = td.timestamps[0]
62+
tdts.add_tag("a", "b")
5163

5264
# Act
53-
pd.add_timestamp_nanoseconds(200)
54-
pdts2 = pd.timestamps[1]
55-
pdts.add_tag("b", "c")
56-
pdts2.add_tags(pdts.tags)
65+
td.add_timestamp_nanoseconds(200)
66+
tdts2 = td.timestamps[1]
67+
tdts.add_tag("b", "c")
68+
tdts2.add_tags(tdts.tags)
5769

5870
# Assert
59-
self.assertEqual(pdts2.tags["a"], "b")
60-
self.assertEqual(pdts2.tags["b"], "c")
71+
self.assertEqual(tdts2.tags["a"], "b")
72+
self.assertEqual(tdts2.tags["b"], "c")
6173

6274
def test_add_tags(self):
6375
# Act

0 commit comments

Comments
 (0)