Skip to content

Commit a92aa32

Browse files
authored
Add a method to clear the application local state (#239)
Add Application.clear_state() method to clear the local state directory before the process starts.
1 parent 266776d commit a92aa32

File tree

6 files changed

+151
-0
lines changed

6 files changed

+151
-0
lines changed

src/StreamingDataFrames/docs/stateful-processing.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,19 @@ Currently, only functions passed to `StreamingDataFrame.apply()`, `StreamingData
7373

7474
<br>
7575

76+
## Clearing the State
77+
78+
To clear all the state data, use the `app.clear_state()` command. This will delete all data stored in the state stores, allowing you to start from a clean slate:
79+
80+
```python
81+
app.clear_state()
82+
```
83+
84+
Note that clearing the app state using `app.clear_state()` is only possible when the `app.run()` is not running. Meaning that the state can be cleared either before calling `app.run()` or after.
85+
This ensures that state clearing does not interfere with the ongoing stateful processing.
86+
87+
<br>
88+
7689
## Changing the State FilePath
7790

7891
By default, an `Application` keeps the state in `state` directory relative to the current working directory.

src/StreamingDataFrames/quixstreams/app.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,12 @@ def stop(self):
346346
"""
347347
self._running = False
348348

349+
def clear_state(self):
350+
"""
351+
Clear the state of the application.
352+
"""
353+
self._state_manager.clear_stores()
354+
349355
def _quix_runtime_init(self):
350356
"""
351357
Do a runtime setup only applicable to an Application.Quix instance

src/StreamingDataFrames/quixstreams/state/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ class PartitionNotAssignedError(QuixException):
55
...
66

77

8+
class PartitionStoreIsUsed(QuixException):
9+
...
10+
11+
812
class StoreNotRegisteredError(QuixException):
913
...
1014

src/StreamingDataFrames/quixstreams/state/manager.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import contextlib
22
import logging
3+
import shutil
34
from pathlib import Path
45
from typing import List, Dict, Optional, Iterator
56

67
from quixstreams.types import TopicPartition
78
from .exceptions import (
89
StoreNotRegisteredError,
910
InvalidStoreTransactionStateError,
11+
PartitionStoreIsUsed,
1012
)
1113
from .rocksdb import RocksDBStore, RocksDBOptionsType
1214
from .types import (
@@ -104,6 +106,21 @@ def register_store(
104106
options=self._rocksdb_options,
105107
)
106108

109+
def clear_stores(self):
110+
"""
111+
Delete all state stores managed by StateStoreManager.
112+
"""
113+
if any(
114+
store.partitions
115+
for topic_stores in self._stores.values()
116+
for store in topic_stores.values()
117+
):
118+
raise PartitionStoreIsUsed(
119+
"Cannot clear stores with active partitions assigned"
120+
)
121+
122+
shutil.rmtree(self._state_dir)
123+
107124
def on_partition_assign(self, tp: TopicPartition) -> List[StorePartition]:
108125
"""
109126
Assign store partitions for each registered store for the given `TopicPartition`

src/StreamingDataFrames/tests/test_quixstreams/test_app.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,3 +717,58 @@ def test_on_assign_topic_offset_behind_warning(
717717

718718
assert mock.called
719719
assert "is behind the stored offset" in mock.call_args[0][0]
720+
721+
def test_clear_state(
722+
self,
723+
app_factory,
724+
producer,
725+
topic_factory,
726+
executor,
727+
state_manager_factory,
728+
tmp_path,
729+
):
730+
"""
731+
Test that clear_state() removes all data from the state stores
732+
"""
733+
734+
consumer_group = str(uuid.uuid4())
735+
state_dir = (tmp_path / "state").absolute()
736+
app = app_factory(
737+
consumer_group=consumer_group,
738+
auto_offset_reset="earliest",
739+
state_dir=state_dir,
740+
)
741+
742+
topic_in_name, _ = topic_factory()
743+
tx_prefix = b"key"
744+
745+
state_manager = state_manager_factory(
746+
group_id=consumer_group, state_dir=state_dir
747+
)
748+
749+
# Add data to the state store
750+
with state_manager:
751+
state_manager.register_store(topic_in_name, "default")
752+
state_manager.on_partition_assign(
753+
TopicPartitionStub(topic=topic_in_name, partition=0)
754+
)
755+
store = state_manager.get_store(topic=topic_in_name, store_name="default")
756+
with store.start_partition_transaction(partition=0) as tx:
757+
# All keys in state must be prefixed with the message key
758+
with tx.with_prefix(tx_prefix):
759+
tx.set("my_state", True)
760+
761+
# Clear the state
762+
app.clear_state()
763+
764+
# Check that the date is cleared from the state store
765+
with state_manager:
766+
state_manager.register_store(topic_in_name, "default")
767+
state_manager.on_partition_assign(
768+
TopicPartitionStub(topic=topic_in_name, partition=0)
769+
)
770+
store = state_manager.get_store(topic=topic_in_name, store_name="default")
771+
with store.start_partition_transaction(partition=0) as tx:
772+
# All keys in state must be prefixed with the message key
773+
with tx.with_prefix(tx_prefix):
774+
assert tx.get("my_state") is None

src/StreamingDataFrames/tests/test_quixstreams/test_state/test_manager.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import contextlib
23
import uuid
34
from unittest.mock import patch
@@ -9,6 +10,7 @@
910
from quixstreams.state.exceptions import (
1011
StoreNotRegisteredError,
1112
InvalidStoreTransactionStateError,
13+
PartitionStoreIsUsed,
1214
)
1315

1416

@@ -108,6 +110,60 @@ def test_get_store_not_registered(self, state_manager):
108110
with pytest.raises(StoreNotRegisteredError):
109111
state_manager.get_store("topic", "store")
110112

113+
def test_clear_stores_when_empty(self, state_manager):
114+
state_manager.clear_stores()
115+
assert not state_manager.stores
116+
117+
def test_clear_stores(self, state_manager):
118+
# Register stores
119+
state_manager.register_store("topic1", store_name="store1")
120+
state_manager.register_store("topic1", store_name="extra_store")
121+
state_manager.register_store("topic2", store_name="store1")
122+
123+
# Define partitions
124+
partitions = [
125+
TopicPartitionStub("topic1", 0),
126+
TopicPartitionStub("topic1", 1),
127+
TopicPartitionStub("topic2", 0),
128+
]
129+
130+
# Assign partitions
131+
for tp in partitions:
132+
state_manager.on_partition_assign(tp)
133+
134+
# Collect paths of stores to be deleted
135+
stores_to_delete = [
136+
store_partition.path
137+
for topic_stores in state_manager.stores.values()
138+
for store in topic_stores.values()
139+
for store_partition in store.partitions.values()
140+
]
141+
142+
# Revoke partitions
143+
for tp in partitions:
144+
state_manager.on_partition_revoke(tp)
145+
146+
# Act - Delete stores
147+
state_manager.clear_stores()
148+
149+
# Assert store paths are deleted
150+
for path in stores_to_delete:
151+
assert not os.path.exists(path), f"RocksDB store at {path} was not deleted"
152+
153+
def test_clear_stores_fails(self, state_manager):
154+
# Register stores
155+
state_manager.register_store("topic1", store_name="store1")
156+
157+
# Define the partition
158+
partition = TopicPartitionStub("topic1", 0)
159+
160+
# Assign the partition
161+
state_manager.on_partition_assign(partition)
162+
163+
# Act - Delete stores
164+
with pytest.raises(PartitionStoreIsUsed):
165+
state_manager.clear_stores()
166+
111167
def test_store_transaction_success(self, state_manager):
112168
state_manager.register_store("topic", "store")
113169
tp = TopicPartitionStub("topic", 0)

0 commit comments

Comments
 (0)