Skip to content

Commit 567196b

Browse files
committed
Rename heartbeat to wall clock
1 parent 0ab9f76 commit 567196b

File tree

6 files changed

+66
-66
lines changed

6 files changed

+66
-66
lines changed

quixstreams/app.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def __init__(
156156
topic_create_timeout: float = 60,
157157
processing_guarantee: ProcessingGuarantee = "at-least-once",
158158
max_partition_buffer_size: int = 10000,
159-
heartbeat_interval: float = 0.0,
159+
wall_clock_interval: float = 0.0,
160160
):
161161
"""
162162
:param broker_address: Connection settings for Kafka.
@@ -226,11 +226,11 @@ def __init__(
226226
It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
227227
Lower value decreases the memory use, but increases the latency.
228228
Default - `10000`.
229-
:param heartbeat_interval: the interval (seconds) at which to send heartbeat messages.
230-
The heartbeat timing starts counting from application start.
231-
TODO: Save and respect last heartbeat timestamp.
232-
The heartbeat is sent for every partition of every topic with registered heartbeat streams.
233-
If the value is 0, no heartbeat messages will be sent.
229+
:param wall_clock_interval: the interval (seconds) at which to invoke
230+
the registered wall clock logic.
231+
The wall clock timing starts counting from application start.
232+
TODO: Save and respect last wall clock timestamp.
233+
If the value is 0, no wall clock logic will be invoked.
234234
Default - `0.0`.
235235
236236
<br><br>***Error Handlers***<br>
@@ -380,9 +380,9 @@ def __init__(
380380
recovery_manager=recovery_manager,
381381
)
382382

383-
self._heartbeat_active = heartbeat_interval > 0
384-
self._heartbeat_interval = heartbeat_interval
385-
self._heartbeat_last_sent = datetime.now().timestamp()
383+
self._wall_clock_active = wall_clock_interval > 0
384+
self._wall_clock_interval = wall_clock_interval
385+
self._wall_clock_last_sent = datetime.now().timestamp()
386386

387387
self._source_manager = SourceManager()
388388
self._sink_manager = SinkManager()
@@ -913,7 +913,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
913913
processing_context = self._processing_context
914914
source_manager = self._source_manager
915915
process_message = self._process_message
916-
process_heartbeat = self._process_heartbeat
916+
process_wall_clock = self._process_wall_clock
917917
printer = self._processing_context.printer
918918
run_tracker = self._run_tracker
919919
consumer = self._consumer
@@ -926,9 +926,9 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
926926
)
927927

928928
dataframes_composed = self._dataframe_registry.compose_all(sink=sink)
929-
heartbeats_composed = self._dataframe_registry.compose_heartbeats()
930-
if not heartbeats_composed:
931-
self._heartbeat_active = False
929+
wall_clock_executors = self._dataframe_registry.compose_wall_clock()
930+
if not wall_clock_executors:
931+
self._wall_clock_active = False
932932

933933
processing_context.init_checkpoint()
934934
run_tracker.set_as_running()
@@ -940,7 +940,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
940940
run_tracker.timeout_refresh()
941941
else:
942942
process_message(dataframes_composed)
943-
process_heartbeat(heartbeats_composed)
943+
process_wall_clock(wall_clock_executors)
944944
processing_context.commit_checkpoint()
945945
consumer.resume_backpressured()
946946
source_manager.raise_for_error()
@@ -1024,18 +1024,18 @@ def _process_message(self, dataframe_composed):
10241024
if self._on_message_processed is not None:
10251025
self._on_message_processed(topic_name, partition, offset)
10261026

1027-
def _process_heartbeat(self, heartbeats_composed):
1028-
if not self._heartbeat_active:
1027+
def _process_wall_clock(self, wall_clock_executors):
1028+
if not self._wall_clock_active:
10291029
return
10301030

10311031
now = datetime.now().timestamp()
1032-
if self._heartbeat_last_sent > now - self._heartbeat_interval:
1032+
if self._wall_clock_last_sent > now - self._wall_clock_interval:
10331033
return
10341034

10351035
value, key, timestamp, headers = None, None, int(now * 1000), {}
10361036

10371037
for tp in self._consumer.assignment():
1038-
if executor := heartbeats_composed.get(tp.topic):
1038+
if executor := wall_clock_executors.get(tp.topic):
10391039
row = Row(
10401040
value=value,
10411041
key=key,
@@ -1057,7 +1057,7 @@ def _process_heartbeat(self, heartbeats_composed):
10571057
if not to_suppress:
10581058
raise
10591059

1060-
self._heartbeat_last_sent = now
1060+
self._wall_clock_last_sent = now
10611061

10621062
def _on_assign(self, _, topic_partitions: List[TopicPartition]):
10631063
"""

quixstreams/core/stream/functions/transform.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from .types import (
55
TransformCallback,
66
TransformExpandedCallback,
7-
TransformHeartbeatCallback,
8-
TransformHeartbeatExpandedCallback,
7+
TransformWallClockCallback,
8+
TransformWallClockExpandedCallback,
99
VoidExecutor,
1010
)
1111

@@ -32,61 +32,61 @@ def __init__(
3232
self,
3333
func: TransformCallback,
3434
expand: Literal[False] = False,
35-
heartbeat: Literal[False] = False,
35+
wall_clock: Literal[False] = False,
3636
) -> None: ...
3737

3838
@overload
3939
def __init__(
4040
self,
4141
func: TransformExpandedCallback,
4242
expand: Literal[True],
43-
heartbeat: Literal[False] = False,
43+
wall_clock: Literal[False] = False,
4444
) -> None: ...
4545

4646
@overload
4747
def __init__(
4848
self,
49-
func: TransformHeartbeatCallback,
49+
func: TransformWallClockCallback,
5050
expand: Literal[False] = False,
51-
heartbeat: Literal[True] = True,
51+
wall_clock: Literal[True] = True,
5252
) -> None: ...
5353

5454
@overload
5555
def __init__(
5656
self,
57-
func: TransformHeartbeatExpandedCallback,
57+
func: TransformWallClockExpandedCallback,
5858
expand: Literal[True],
59-
heartbeat: Literal[True],
59+
wall_clock: Literal[True],
6060
) -> None: ...
6161

6262
def __init__(
6363
self,
6464
func: Union[
6565
TransformCallback,
6666
TransformExpandedCallback,
67-
TransformHeartbeatCallback,
68-
TransformHeartbeatExpandedCallback,
67+
TransformWallClockCallback,
68+
TransformWallClockExpandedCallback,
6969
],
7070
expand: bool = False,
71-
heartbeat: bool = False,
71+
wall_clock: bool = False,
7272
):
7373
super().__init__(func)
7474

7575
self.func: Union[
7676
TransformCallback,
7777
TransformExpandedCallback,
78-
TransformHeartbeatCallback,
79-
TransformHeartbeatExpandedCallback,
78+
TransformWallClockCallback,
79+
TransformWallClockExpandedCallback,
8080
]
8181
self.expand = expand
82-
self.heartbeat = heartbeat
82+
self.wall_clock = wall_clock
8383

8484
def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
8585
child_executor = self._resolve_branching(*child_executors)
8686

87-
if self.expand and self.heartbeat:
88-
heartbeat_expanded_func = cast(
89-
TransformHeartbeatExpandedCallback, self.func
87+
if self.expand and self.wall_clock:
88+
wall_clock_expanded_func = cast(
89+
TransformWallClockExpandedCallback, self.func
9090
)
9191

9292
def wrapper(
@@ -100,7 +100,7 @@ def wrapper(
100100
new_key,
101101
new_timestamp,
102102
new_headers,
103-
) in heartbeat_expanded_func(timestamp):
103+
) in wall_clock_expanded_func(timestamp):
104104
child_executor(new_value, new_key, new_timestamp, new_headers)
105105

106106
elif self.expand:
@@ -116,16 +116,16 @@ def wrapper(
116116
for new_value, new_key, new_timestamp, new_headers in result:
117117
child_executor(new_value, new_key, new_timestamp, new_headers)
118118

119-
elif self.heartbeat:
120-
heartbeat_func = cast(TransformHeartbeatCallback, self.func)
119+
elif self.wall_clock:
120+
wall_clock_func = cast(TransformWallClockCallback, self.func)
121121

122122
def wrapper(
123123
value: Any,
124124
key: Any,
125125
timestamp: int,
126126
headers: Any,
127127
):
128-
new_value, new_key, new_timestamp, new_headers = heartbeat_func(
128+
new_value, new_key, new_timestamp, new_headers = wall_clock_func(
129129
timestamp
130130
)
131131
child_executor(new_value, new_key, new_timestamp, new_headers)

quixstreams/core/stream/functions/types.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
"FilterWithMetadataCallback",
1515
"TransformCallback",
1616
"TransformExpandedCallback",
17-
"TransformHeartbeatCallback",
18-
"TransformHeartbeatExpandedCallback",
17+
"TransformWallClockCallback",
18+
"TransformWallClockExpandedCallback",
1919
)
2020

2121

@@ -37,8 +37,8 @@ def __bool__(self) -> bool: ...
3737
TransformExpandedCallback = Callable[
3838
[Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]]
3939
]
40-
TransformHeartbeatCallback = Callable[[int], Tuple[Any, Any, int, Any]]
41-
TransformHeartbeatExpandedCallback = Callable[
40+
TransformWallClockCallback = Callable[[int], Tuple[Any, Any, int, Any]]
41+
TransformWallClockExpandedCallback = Callable[
4242
[int], Iterable[Tuple[Any, Any, int, Any]]
4343
]
4444

quixstreams/core/stream/stream.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
TransformCallback,
3131
TransformExpandedCallback,
3232
TransformFunction,
33-
TransformHeartbeatCallback,
34-
TransformHeartbeatExpandedCallback,
33+
TransformWallClockCallback,
34+
TransformWallClockExpandedCallback,
3535
UpdateCallback,
3636
UpdateFunction,
3737
UpdateWithMetadataCallback,
@@ -256,7 +256,7 @@ def add_transform(
256256
func: TransformCallback,
257257
*,
258258
expand: Literal[False] = False,
259-
heartbeat: Literal[False] = False,
259+
wall_clock: Literal[False] = False,
260260
):
261261
pass
262262

@@ -266,27 +266,27 @@ def add_transform(
266266
func: TransformExpandedCallback,
267267
*,
268268
expand: Literal[True],
269-
heartbeat: Literal[False] = False,
269+
wall_clock: Literal[False] = False,
270270
):
271271
pass
272272

273273
@overload
274274
def add_transform(
275275
self,
276-
func: TransformHeartbeatCallback,
276+
func: TransformWallClockCallback,
277277
*,
278278
expand: Literal[False] = False,
279-
heartbeat: Literal[True],
279+
wall_clock: Literal[True],
280280
):
281281
pass
282282

283283
@overload
284284
def add_transform(
285285
self,
286-
func: TransformHeartbeatExpandedCallback,
286+
func: TransformWallClockExpandedCallback,
287287
*,
288288
expand: Literal[True],
289-
heartbeat: Literal[True],
289+
wall_clock: Literal[True],
290290
):
291291
pass
292292

@@ -295,12 +295,12 @@ def add_transform(
295295
func: Union[
296296
TransformCallback,
297297
TransformExpandedCallback,
298-
TransformHeartbeatCallback,
299-
TransformHeartbeatExpandedCallback,
298+
TransformWallClockCallback,
299+
TransformWallClockExpandedCallback,
300300
],
301301
*,
302302
expand: bool = False,
303-
heartbeat: bool = False,
303+
wall_clock: bool = False,
304304
) -> "Stream":
305305
"""
306306
Add a "transform" function to the Stream, that will mutate the input value.
@@ -316,11 +316,11 @@ def add_transform(
316316
:param expand: if True, expand the returned iterable into individual items
317317
downstream. If returned value is not iterable, `TypeError` will be raised.
318318
Default - `False`.
319-
:param heartbeat: if True, the callback is expected to accept timestamp only.
319+
:param wall_clock: if True, the callback is expected to accept timestamp only.
320320
Default - `False`.
321321
:return: a new Stream derived from the current one
322322
"""
323-
return self._add(TransformFunction(func, expand=expand, heartbeat=heartbeat)) # type: ignore[call-overload]
323+
return self._add(TransformFunction(func, expand=expand, wall_clock=wall_clock)) # type: ignore[call-overload]
324324

325325
def merge(self, other: "Stream") -> "Stream":
326326
"""

quixstreams/dataframe/dataframe.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,8 +1696,8 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16961696
stream=merged_stream, stream_id=merged_stream_id
16971697
)
16981698

1699-
def concat_heartbeat(self, stream: Stream) -> "StreamingDataFrame":
1700-
self._registry.register_heartbeat(self, stream)
1699+
def concat_wall_clock(self, stream: Stream) -> "StreamingDataFrame":
1700+
self._registry.register_wall_clock(self, stream)
17011701
return self.__dataframe_clone__(stream=self.stream.merge(stream))
17021702

17031703
def join_asof(

quixstreams/dataframe/registry.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class DataFrameRegistry:
2222

2323
def __init__(self) -> None:
2424
self._registry: dict[str, Stream] = {}
25-
self._heartbeat_registry: dict[str, Stream] = {}
25+
self._wall_clock_registry: dict[str, Stream] = {}
2626
self._topics: list[Topic] = []
2727
self._repartition_origins: set[str] = set()
2828
self._topics_to_stream_ids: dict[str, set[str]] = {}
@@ -70,19 +70,19 @@ def register_root(
7070
self._topics.append(topic)
7171
self._registry[topic.name] = dataframe.stream
7272

73-
def register_heartbeat(
73+
def register_wall_clock(
7474
self, dataframe: "StreamingDataFrame", stream: Stream
7575
) -> None:
7676
"""
77-
Register a heartbeat Stream for the given topic.
77+
Register a wall clock stream for the given topic.
7878
"""
7979
topics = dataframe.topics
8080
if len(topics) > 1:
8181
raise ValueError(
8282
f"Expected a StreamingDataFrame with one topic, got {len(topics)}"
8383
)
8484
topic = topics[0]
85-
self._heartbeat_registry[topic.name] = stream
85+
self._wall_clock_registry[topic.name] = stream
8686

8787
def register_groupby(
8888
self,
@@ -130,12 +130,12 @@ def compose_all(
130130
"""
131131
return self._compose(registry=self._registry, sink=sink)
132132

133-
def compose_heartbeats(self) -> dict[str, VoidExecutor]:
133+
def compose_wall_clock(self) -> dict[str, VoidExecutor]:
134134
"""
135-
Composes all the heartbeat Streams and returns a dict of format {<topic>: <VoidExecutor>}
135+
Composes all the wall clock streams and returns a dict of format {<topic>: <VoidExecutor>}
136136
:return: a {topic_name: composed} dict, where composed is a callable
137137
"""
138-
return self._compose(registry=self._heartbeat_registry)
138+
return self._compose(registry=self._wall_clock_registry)
139139

140140
def _compose(
141141
self, registry: dict[str, Stream], sink: Optional[VoidExecutor] = None

0 commit comments

Comments
 (0)