Skip to content

Commit 0ab9f76

Browse files
committed
Heartbeat basics
1 parent da3142f commit 0ab9f76

File tree

6 files changed

+219
-13
lines changed

6 files changed

+219
-13
lines changed

quixstreams/app.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import uuid
88
import warnings
99
from collections import defaultdict
10+
from datetime import datetime
1011
from pathlib import Path
1112
from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union
1213

@@ -30,6 +31,8 @@
3031
from .logging import LogLevel, configure_logging
3132
from .models import (
3233
DeserializerType,
34+
MessageContext,
35+
Row,
3336
SerializerType,
3437
TimestampExtractor,
3538
Topic,
@@ -153,6 +156,7 @@ def __init__(
153156
topic_create_timeout: float = 60,
154157
processing_guarantee: ProcessingGuarantee = "at-least-once",
155158
max_partition_buffer_size: int = 10000,
159+
heartbeat_interval: float = 0.0,
156160
):
157161
"""
158162
:param broker_address: Connection settings for Kafka.
@@ -222,6 +226,12 @@ def __init__(
222226
It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
223227
Lower value decreases the memory use, but increases the latency.
224228
Default - `10000`.
229+
:param heartbeat_interval: the interval (seconds) at which to send heartbeat messages.
230+
The heartbeat timing starts counting from application start.
231+
TODO: Save and respect last heartbeat timestamp.
232+
The heartbeat is sent for every partition of every topic with registered heartbeat streams.
233+
If the value is 0, no heartbeat messages will be sent.
234+
Default - `0.0`.
225235
226236
<br><br>***Error Handlers***<br>
227237
To handle errors, `Application` accepts callbacks triggered when
@@ -370,6 +380,10 @@ def __init__(
370380
recovery_manager=recovery_manager,
371381
)
372382

383+
self._heartbeat_active = heartbeat_interval > 0
384+
self._heartbeat_interval = heartbeat_interval
385+
self._heartbeat_last_sent = datetime.now().timestamp()
386+
373387
self._source_manager = SourceManager()
374388
self._sink_manager = SinkManager()
375389
self._dataframe_registry = DataFrameRegistry()
@@ -899,6 +913,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
899913
processing_context = self._processing_context
900914
source_manager = self._source_manager
901915
process_message = self._process_message
916+
process_heartbeat = self._process_heartbeat
902917
printer = self._processing_context.printer
903918
run_tracker = self._run_tracker
904919
consumer = self._consumer
@@ -911,6 +926,9 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
911926
)
912927

913928
dataframes_composed = self._dataframe_registry.compose_all(sink=sink)
929+
heartbeats_composed = self._dataframe_registry.compose_heartbeats()
930+
if not heartbeats_composed:
931+
self._heartbeat_active = False
914932

915933
processing_context.init_checkpoint()
916934
run_tracker.set_as_running()
@@ -922,6 +940,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
922940
run_tracker.timeout_refresh()
923941
else:
924942
process_message(dataframes_composed)
943+
process_heartbeat(heartbeats_composed)
925944
processing_context.commit_checkpoint()
926945
consumer.resume_backpressured()
927946
source_manager.raise_for_error()
@@ -1005,6 +1024,41 @@ def _process_message(self, dataframe_composed):
10051024
if self._on_message_processed is not None:
10061025
self._on_message_processed(topic_name, partition, offset)
10071026

1027+
def _process_heartbeat(self, heartbeats_composed):
1028+
if not self._heartbeat_active:
1029+
return
1030+
1031+
now = datetime.now().timestamp()
1032+
if self._heartbeat_last_sent > now - self._heartbeat_interval:
1033+
return
1034+
1035+
value, key, timestamp, headers = None, None, int(now * 1000), {}
1036+
1037+
for tp in self._consumer.assignment():
1038+
if executor := heartbeats_composed.get(tp.topic):
1039+
row = Row(
1040+
value=value,
1041+
key=key,
1042+
timestamp=timestamp,
1043+
context=MessageContext(
1044+
topic=tp.topic,
1045+
partition=tp.partition,
1046+
offset=-1, # TODO: get correct offsets
1047+
size=-1,
1048+
),
1049+
headers=headers,
1050+
)
1051+
context = copy_context()
1052+
context.run(set_message_context, row.context)
1053+
try:
1054+
context.run(executor, value, key, timestamp, headers)
1055+
except Exception as exc:
1056+
to_suppress = self._on_processing_error(exc, row, logger)
1057+
if not to_suppress:
1058+
raise
1059+
1060+
self._heartbeat_last_sent = now
1061+
10081062
def _on_assign(self, _, topic_partitions: List[TopicPartition]):
10091063
"""
10101064
Assign new topic partitions to consumer and state.

quixstreams/core/stream/functions/transform.py

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
from typing import Any, Literal, Union, cast, overload
22

33
from .base import StreamFunction
4-
from .types import TransformCallback, TransformExpandedCallback, VoidExecutor
4+
from .types import (
5+
TransformCallback,
6+
TransformExpandedCallback,
7+
TransformHeartbeatCallback,
8+
TransformHeartbeatExpandedCallback,
9+
VoidExecutor,
10+
)
511

612
__all__ = ("TransformFunction",)
713

@@ -23,28 +29,81 @@ class TransformFunction(StreamFunction):
2329

2430
@overload
2531
def __init__(
26-
self, func: TransformCallback, expand: Literal[False] = False
32+
self,
33+
func: TransformCallback,
34+
expand: Literal[False] = False,
35+
heartbeat: Literal[False] = False,
36+
) -> None: ...
37+
38+
@overload
39+
def __init__(
40+
self,
41+
func: TransformExpandedCallback,
42+
expand: Literal[True],
43+
heartbeat: Literal[False] = False,
44+
) -> None: ...
45+
46+
@overload
47+
def __init__(
48+
self,
49+
func: TransformHeartbeatCallback,
50+
expand: Literal[False] = False,
51+
heartbeat: Literal[True] = True,
2752
) -> None: ...
2853

2954
@overload
3055
def __init__(
31-
self, func: TransformExpandedCallback, expand: Literal[True]
56+
self,
57+
func: TransformHeartbeatExpandedCallback,
58+
expand: Literal[True],
59+
heartbeat: Literal[True],
3260
) -> None: ...
3361

3462
def __init__(
3563
self,
36-
func: Union[TransformCallback, TransformExpandedCallback],
64+
func: Union[
65+
TransformCallback,
66+
TransformExpandedCallback,
67+
TransformHeartbeatCallback,
68+
TransformHeartbeatExpandedCallback,
69+
],
3770
expand: bool = False,
71+
heartbeat: bool = False,
3872
):
3973
super().__init__(func)
4074

41-
self.func: Union[TransformCallback, TransformExpandedCallback]
75+
self.func: Union[
76+
TransformCallback,
77+
TransformExpandedCallback,
78+
TransformHeartbeatCallback,
79+
TransformHeartbeatExpandedCallback,
80+
]
4281
self.expand = expand
82+
self.heartbeat = heartbeat
4383

4484
def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
4585
child_executor = self._resolve_branching(*child_executors)
4686

47-
if self.expand:
87+
if self.expand and self.heartbeat:
88+
heartbeat_expanded_func = cast(
89+
TransformHeartbeatExpandedCallback, self.func
90+
)
91+
92+
def wrapper(
93+
value: Any,
94+
key: Any,
95+
timestamp: int,
96+
headers: Any,
97+
):
98+
for (
99+
new_value,
100+
new_key,
101+
new_timestamp,
102+
new_headers,
103+
) in heartbeat_expanded_func(timestamp):
104+
child_executor(new_value, new_key, new_timestamp, new_headers)
105+
106+
elif self.expand:
48107
expanded_func = cast(TransformExpandedCallback, self.func)
49108

50109
def wrapper(
@@ -57,8 +116,22 @@ def wrapper(
57116
for new_value, new_key, new_timestamp, new_headers in result:
58117
child_executor(new_value, new_key, new_timestamp, new_headers)
59118

119+
elif self.heartbeat:
120+
heartbeat_func = cast(TransformHeartbeatCallback, self.func)
121+
122+
def wrapper(
123+
value: Any,
124+
key: Any,
125+
timestamp: int,
126+
headers: Any,
127+
):
128+
new_value, new_key, new_timestamp, new_headers = heartbeat_func(
129+
timestamp
130+
)
131+
child_executor(new_value, new_key, new_timestamp, new_headers)
132+
60133
else:
61-
func = cast(TransformCallback, self.func)
134+
regular_func = cast(TransformCallback, self.func)
62135

63136
def wrapper(
64137
value: Any,
@@ -67,7 +140,7 @@ def wrapper(
67140
headers: Any,
68141
):
69142
# Execute a function on a single value and return its result
70-
new_value, new_key, new_timestamp, new_headers = func(
143+
new_value, new_key, new_timestamp, new_headers = regular_func(
71144
value, key, timestamp, headers
72145
)
73146
child_executor(new_value, new_key, new_timestamp, new_headers)

quixstreams/core/stream/functions/types.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
"FilterWithMetadataCallback",
1515
"TransformCallback",
1616
"TransformExpandedCallback",
17+
"TransformHeartbeatCallback",
18+
"TransformHeartbeatExpandedCallback",
1719
)
1820

1921

@@ -35,6 +37,10 @@ def __bool__(self) -> bool: ...
3537
TransformExpandedCallback = Callable[
3638
[Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]]
3739
]
40+
TransformHeartbeatCallback = Callable[[int], Tuple[Any, Any, int, Any]]
41+
TransformHeartbeatExpandedCallback = Callable[
42+
[int], Iterable[Tuple[Any, Any, int, Any]]
43+
]
3844

3945
StreamCallback = Union[
4046
ApplyCallback,

quixstreams/core/stream/stream.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
TransformCallback,
3131
TransformExpandedCallback,
3232
TransformFunction,
33+
TransformHeartbeatCallback,
34+
TransformHeartbeatExpandedCallback,
3335
UpdateCallback,
3436
UpdateFunction,
3537
UpdateWithMetadataCallback,
@@ -249,18 +251,56 @@ def add_update(
249251
return self._add(update_func)
250252

251253
@overload
252-
def add_transform(self, func: TransformCallback, *, expand: Literal[False] = False):
254+
def add_transform(
255+
self,
256+
func: TransformCallback,
257+
*,
258+
expand: Literal[False] = False,
259+
heartbeat: Literal[False] = False,
260+
):
253261
pass
254262

255263
@overload
256-
def add_transform(self, func: TransformExpandedCallback, *, expand: Literal[True]):
264+
def add_transform(
265+
self,
266+
func: TransformExpandedCallback,
267+
*,
268+
expand: Literal[True],
269+
heartbeat: Literal[False] = False,
270+
):
257271
pass
258272

273+
@overload
259274
def add_transform(
260275
self,
261-
func: Union[TransformCallback, TransformExpandedCallback],
276+
func: TransformHeartbeatCallback,
277+
*,
278+
expand: Literal[False] = False,
279+
heartbeat: Literal[True],
280+
):
281+
pass
282+
283+
@overload
284+
def add_transform(
285+
self,
286+
func: TransformHeartbeatExpandedCallback,
287+
*,
288+
expand: Literal[True],
289+
heartbeat: Literal[True],
290+
):
291+
pass
292+
293+
def add_transform(
294+
self,
295+
func: Union[
296+
TransformCallback,
297+
TransformExpandedCallback,
298+
TransformHeartbeatCallback,
299+
TransformHeartbeatExpandedCallback,
300+
],
262301
*,
263302
expand: bool = False,
303+
heartbeat: bool = False,
264304
) -> "Stream":
265305
"""
266306
Add a "transform" function to the Stream, that will mutate the input value.
@@ -276,9 +316,11 @@ def add_transform(
276316
:param expand: if True, expand the returned iterable into individual items
277317
downstream. If returned value is not iterable, `TypeError` will be raised.
278318
Default - `False`.
319+
:param heartbeat: if True, the callback is expected to accept timestamp only.
320+
Default - `False`.
279321
:return: a new Stream derived from the current one
280322
"""
281-
return self._add(TransformFunction(func, expand=expand)) # type: ignore[call-overload]
323+
return self._add(TransformFunction(func, expand=expand, heartbeat=heartbeat)) # type: ignore[call-overload]
282324

283325
def merge(self, other: "Stream") -> "Stream":
284326
"""

quixstreams/dataframe/dataframe.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,6 +1696,10 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16961696
stream=merged_stream, stream_id=merged_stream_id
16971697
)
16981698

1699+
def concat_heartbeat(self, stream: Stream) -> "StreamingDataFrame":
1700+
self._registry.register_heartbeat(self, stream)
1701+
return self.__dataframe_clone__(stream=self.stream.merge(stream))
1702+
16991703
def join_asof(
17001704
self,
17011705
right: "StreamingDataFrame",

0 commit comments

Comments
 (0)