11import logging
22import sys
33import time
4- from typing import Any , Iterable , Mapping , Optional
4+ import typing
5+ from typing import Any , Callable , Iterable , Literal , Mapping , Optional , Union
56
67from quixstreams .models import HeadersTuples
78
2021logger = logging .getLogger (__name__ )
2122
2223
24+ TimePrecision = Literal ["ms" , "ns" , "us" , "s" ]
25+
26+ InfluxDBValueMap = dict [str , Union [str , int , float , bool ]]
27+
28+ FieldsCallable = Callable [[InfluxDBValueMap ], Iterable [str ]]
29+ MeasurementCallable = Callable [[InfluxDBValueMap ], str ]
30+ TagsCallable = Callable [[InfluxDBValueMap ], Iterable [str ]]
31+
32+
33+ FieldsSetter = Union [Iterable [str ], FieldsCallable ]
34+ MeasurementSetter = Union [str , MeasurementCallable ]
35+ TagsSetter = Union [Iterable [str ], TagsCallable ]
36+
37+
2338class InfluxDB3Sink (BatchingSink ):
39+ _TIME_PRECISIONS = {
40+ "ms" : WritePrecision .MS ,
41+ "ns" : WritePrecision .NS ,
42+ "us" : WritePrecision .US ,
43+ "s" : WritePrecision .S ,
44+ }
45+
2446 def __init__ (
2547 self ,
2648 token : str ,
2749 host : str ,
2850 organization_id : str ,
2951 database : str ,
30- measurement : str ,
31- fields_keys : Iterable [ str ] = (),
32- tags_keys : Iterable [ str ] = (),
52+ measurement : MeasurementSetter ,
53+ fields_keys : FieldsSetter = (),
54+ tags_keys : TagsSetter = (),
3355 time_key : Optional [str ] = None ,
34- time_precision : WritePrecision = WritePrecision .MS , # type: ignore
56+ time_precision : TimePrecision = "ms" ,
57+ allow_missing_fields : bool = False ,
3558 include_metadata_tags : bool = False ,
3659 batch_size : int = 1000 ,
3760 enable_gzip : bool = True ,
@@ -58,25 +81,34 @@ def __init__(
5881 :param host: InfluxDB host in format "https://<host>"
5982 :param organization_id: InfluxDB organization_id
6083 :param database: database name
61- :measurement: measurement name
62- :param fields_keys: a list of keys to be used as "fields" when writing to InfluxDB.
63- If present, it must not overlap with "tags_keys".
64- If empty, the whole record value will be used.
84+ :param measurement: measurement name as a string.
85+ Also accepts a single-argument callable that receives the current message
86+ data as a dict and returns a string.
87+ :param fields_keys: an iterable (list) of strings used as InfluxDB "fields".
88+ Also accepts a single-argument callable that receives the current message
89+ data as a dict and returns an iterable of strings.
90+ - If present, it must not overlap with "tags_keys".
91+ - If empty, the whole record value will be used.
6592 >***NOTE*** The fields' values can only be strings, floats, integers, or booleans.
6693 Default - `()`.
67- :param tags_keys: a list of keys to be used as "tags" when writing to InfluxDB.
68- If present, it must not overlap with "fields_keys".
69- These keys will be popped from the value dictionary
70- automatically because InfluxDB doesn't allow the same keys be
71- both in tags and fields.
72- If empty, no tags will be sent.
94+ :param tags_keys: an iterable (list) of strings used as InfluxDB "tags".
95+ Also accepts a single-argument callable that receives the current message
96+ data as a dict and returns an iterable of strings.
97+ - If present, it must not overlap with "fields_keys".
98+ - Given keys are popped from the value dictionary since the same key
99+ cannot be both a tag and field.
100+ - If empty, no tags will be sent.
73101 >***NOTE***: InfluxDB client always converts tag values to strings.
74102 Default - `()`.
75103 :param time_key: a key to be used as "time" when writing to InfluxDB.
76104 By default, the record timestamp will be used with "ms" time precision.
77105 When using a custom key, you may need to adjust the `time_precision` setting
78106 to match.
79107 :param time_precision: a time precision to use when writing to InfluxDB.
108+ Possible values: "ms", "ns", "us", "s".
109+ Default - `"ms"`.
110+ :param allow_missing_fields: if `True`, skip the missing fields keys, else raise `KeyError`.
111+ Default - `False`
80112 :param include_metadata_tags: if True, includes record's key, topic,
81113 and partition as tags.
82114 Default - `False`.
@@ -93,12 +125,18 @@ def __init__(
93125 """
94126
95127 super ().__init__ ()
96- fields_tags_keys_overlap = set (fields_keys ) & set (tags_keys )
97- if fields_tags_keys_overlap :
98- overlap_str = "," .join (str (k ) for k in fields_tags_keys_overlap )
128+ if time_precision not in (time_args := typing .get_args (TimePrecision )):
99129 raise ValueError (
100- f'Keys { overlap_str } are present in both "fields_keys" and "tags_keys"'
130+ f"Invalid 'time_precision' argument { time_precision } ; "
131+ f"valid options: { time_args } "
101132 )
133+ if not callable (fields_keys ) and not callable (tags_keys ):
134+ fields_tags_keys_overlap = set (fields_keys ) & set (tags_keys )
135+ if fields_tags_keys_overlap :
136+ overlap_str = "," .join (str (k ) for k in fields_tags_keys_overlap )
137+ raise ValueError (
138+ f'Keys { overlap_str } are present in both "fields_keys" and "tags_keys"'
139+ )
102140
103141 self ._client = InfluxDBClient3 (
104142 token = token ,
@@ -114,13 +152,30 @@ def __init__(
114152 )
115153 },
116154 )
117- self ._measurement = measurement
118- self ._fields_keys = fields_keys
119- self ._tags_keys = tags_keys
155+
156+ self ._measurement = self ._measurement_callable (measurement )
157+ self ._fields_keys = self ._fields_callable (fields_keys )
158+ self ._tags_keys = self ._tags_callable (tags_keys )
120159 self ._include_metadata_tags = include_metadata_tags
121160 self ._time_key = time_key
122- self ._write_precision = time_precision
161+ self ._write_precision = self . _TIME_PRECISIONS [ time_precision ]
123162 self ._batch_size = batch_size
163+ self ._allow_missing_fields = allow_missing_fields
164+
165+ def _measurement_callable (self , setter : MeasurementSetter ) -> MeasurementCallable :
166+ if callable (setter ):
167+ return setter
168+ return lambda value : setter
169+
170+ def _fields_callable (self , setter : FieldsSetter ) -> FieldsCallable :
171+ if callable (setter ):
172+ return setter
173+ return lambda value : setter
174+
175+ def _tags_callable (self , setter : TagsSetter ) -> TagsCallable :
176+ if callable (setter ):
177+ return setter
178+ return lambda value : setter
124179
125180 def add (
126181 self ,
@@ -160,15 +215,19 @@ def write(self, batch: SinkBatch):
160215
161216 for item in write_batch :
162217 value = item .value
218+ # Evaluate these before we alter the value
219+ _measurement = measurement (value )
220+ _tags_keys = tags_keys (value )
221+ _fields_keys = fields_keys (value )
222+
163223 tags = {}
164- if tags_keys :
165- for tag_key in tags_keys :
166- # TODO: InfluxDB client always converts tags values to strings
167- # by doing str().
168- # We may add some extra validation here in the future to prevent
169- # unwanted conversion.
170- tag = value .pop (tag_key )
171- tags [tag_key ] = tag
224+ for tag_key in _tags_keys :
225+ # TODO: InfluxDB client always converts tags values to strings
226+ # by doing str().
227+ # We may add some extra validation here in the future to prevent
228+ # unwanted conversion.
229+ tag = value .pop (tag_key )
230+ tags [tag_key ] = tag
172231
173232 if self ._include_metadata_tags :
174233 tags ["__key" ] = item .key
@@ -178,15 +237,16 @@ def write(self, batch: SinkBatch):
178237 fields = (
179238 {
180239 field_key : value [field_key ]
181- for field_key in fields_keys
182- if field_key not in tags_keys
240+ for field_key in _fields_keys
241+ if (field_key in value or not self ._allow_missing_fields )
242+ and field_key not in _tags_keys
183243 }
184- if fields_keys
244+ if _fields_keys
185245 else value
186246 )
187247 ts = value [time_key ] if time_key is not None else item .timestamp
188248 record = {
189- "measurement" : measurement ,
249+ "measurement" : _measurement ,
190250 "tags" : tags ,
191251 "fields" : fields ,
192252 "time" : ts ,
0 commit comments