Skip to content

Commit 418528f

Browse files
author
Roman Kitaev
committed
add eviction_listener callback
1 parent 2e9f498 commit 418528f

File tree

6 files changed

+179
-24
lines changed

6 files changed

+179
-24
lines changed

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "moka-py"
3-
version = "0.1.11"
3+
version = "0.1.12"
44
edition = "2021"
55

66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

README.md

+54
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ async def f(x, y):
8181
await asyncio.sleep(2.0)
8282
return x + y
8383

84+
8485
start = perf_counter()
8586
assert asyncio.run(f(5, 6)) == 11
8687
assert asyncio.run(f(5, 6)) == 11 # got from cache
@@ -138,6 +139,59 @@ if __name__ == '__main__':
138139

139140
> **_ATTENTION:_** `wait_concurrent` is not yet supported for async functions and will throw `NotImplementedError`
140141
142+
## Eviction listener
143+
144+
moka-py supports adding of an eviction listener that's called whenever a key is dropped
145+
from the cache for some reason. The listener must be a 3-arguments function `(key, value, cause)`. The arguments
146+
are passed as positional (not keyword).
147+
148+
There are 4 reasons why a key may be dropped:
149+
150+
1. `"expired"`: The entry's expiration timestamp has passed.
151+
2. `"explicit"`: The entry was manually removed by the user (`.remove()` is called).
152+
3. `"replaced"`: The entry itself was not actually removed, but its value was replaced by the user (`.set()` is
153+
called for an existing entry).
154+
4. `"size"`: The entry was evicted due to size constraints.
155+
156+
```python
157+
from typing import Literal
158+
from moka_py import Moka
159+
from time import sleep
160+
161+
162+
def key_evicted(
163+
k: str,
164+
v: list[int],
165+
cause: Literal["explicit", "size", "expired", "replaced"]
166+
):
167+
print(f"entry {k}:{v} was evicted. {cause=}")
168+
169+
170+
moka: Moka[str, list[int]] = Moka(2, eviction_listener=key_evicted, ttl=0.1)
171+
moka.set("hello", [1, 2, 3])
172+
moka.set("hello", [3, 2, 1])
173+
moka.set("foo", [4])
174+
moka.set("bar", [])
175+
sleep(1)
176+
moka.get("foo")
177+
178+
# will print
179+
# entry hello:[1, 2, 3] was evicted. cause='replaced'
180+
# entry bar:[] was evicted. cause='size'
181+
# entry hello:[3, 2, 1] was evicted. cause='expired'
182+
# entry foo:[4] was evicted. cause='expired'
183+
```
184+
185+
> **_IMPORTANT NOTES_**:
186+
> 1. It's not guaranteed that the listener will be called just in time. Also, the underlying `moka` doesn't use any
187+
background threads or tasks, hence, the listener is never called in "background"
188+
> 2. The listener must never raise any kind of `Exception`. If an exception is raised, it might be raised to any of the
189+
moka-py method in any of the threads that call this method.
190+
> 3. The listener must be fast. Since it's called only when you're interacting with `moka-py` (via `.get()` / `.set()` /
191+
etc.), the listener will slow down these operations. It's terrible idea to do some sort of IO in the listener. If
192+
you need so, run a `ThreadPoolExecutor` somewhere and call `.submit()` inside of the listener or commit an async
193+
task via `asyncio.create_task()`
194+
141195
## Performance
142196

143197
*Measured using MacBook Pro 2021 with Apple M1 Pro processor and 16GiB RAM*

moka_py/__init__.pyi

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
from typing import TypeVar, Optional, Generic, Hashable, Union, Callable, Any, overload
1+
from typing import TypeVar, Optional, Generic, Hashable, Union, Callable, Any, overload, Literal
22

33

44
K = TypeVar("K", bound=Hashable)
55
V = TypeVar("V")
66
D = TypeVar("D")
77
Fn = TypeVar("Fn", bound=Callable[..., Any])
8+
Cause = Literal["explicit", "size", "expired", "replaced"]
89

910

1011
class Moka(Generic[K, V]):
@@ -13,6 +14,7 @@ class Moka(Generic[K, V]):
1314
capacity: int,
1415
ttl: Optional[Union[int, float]] = None,
1516
tti: Optional[Union[int, float]] = None,
17+
eviction_listener: Optional[Callable[[K, V, Cause], None]] = None,
1618
): ...
1719

1820
def set(self, key: K, value: V) -> None: ...

src/lib.rs

+73-20
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ use std::hash::{Hash, Hasher};
22
use std::sync::Arc;
33
use std::time::Duration;
44

5+
use moka::notification::RemovalCause;
56
use moka::sync::Cache;
67
use pyo3::exceptions::PyValueError;
78
use pyo3::prelude::*;
89
use pyo3::pyclass::CompareOp;
910
use pyo3::types::{PyString, PyType};
1011

1112
#[derive(Debug)]
12-
enum AnyKey {
13+
enum KeyKind {
1314
/// String keys are the most common. If the string is short enough,
1415
/// we can get faster and more freedom from GIL by copying a string
1516
/// to Rust and hashing it using `ahash` instead of calling
@@ -20,47 +21,71 @@ enum AnyKey {
2021
ShortStr(String),
2122

2223
/// Other keys (even long Python strings) go this (slower) way
23-
Other(PyObject, isize),
24+
Other { py_hash: isize },
25+
}
26+
27+
#[derive(Debug)]
28+
struct AnyKey {
29+
obj: PyObject,
30+
kind: KeyKind,
2431
}
2532

2633
impl AnyKey {
2734
const SHORT_STR: usize = 256;
2835

2936
#[inline]
3037
fn new_with_gil(obj: PyObject, py: Python) -> PyResult<Self> {
31-
if let Ok(s) = obj.downcast_bound::<PyString>(py) {
32-
if s.len()? <= Self::SHORT_STR {
33-
return Ok(AnyKey::ShortStr(s.to_string()));
38+
let kind = match obj.downcast_bound::<PyString>(py) {
39+
Ok(s) if s.len()? <= Self::SHORT_STR => KeyKind::ShortStr(s.to_string()),
40+
_ => {
41+
let py_hash = obj.to_object(py).into_bound(py).hash()?;
42+
KeyKind::Other { py_hash }
3443
}
35-
}
36-
let hash = obj.to_object(py).into_bound(py).hash()?;
37-
Ok(AnyKey::Other(obj, hash))
44+
};
45+
Ok(AnyKey { obj, kind })
3846
}
3947
}
4048

4149
impl PartialEq for AnyKey {
4250
#[inline]
4351
fn eq(&self, other: &Self) -> bool {
4452
match (self, other) {
45-
(AnyKey::ShortStr(lhs), AnyKey::ShortStr(rhs)) => lhs == rhs,
53+
(
54+
AnyKey {
55+
kind: KeyKind::ShortStr(lhs),
56+
..
57+
},
58+
AnyKey {
59+
kind: KeyKind::ShortStr(rhs),
60+
..
61+
},
62+
) => lhs == rhs,
4663

4764
// It is expected that `hash` will be stable for an object. Hence, since we already
4865
// know both objects' hashes, we can claim that if their hashes are different,
4966
// the objects aren't equal. Only if the hashes are the same, the objects
5067
// might be equal, and only in that case we raise the GIL to run Python
5168
// rich comparison.
52-
(AnyKey::Other(lhs, lhs_hash), AnyKey::Other(rhs, rhs_hash)) => {
69+
(
70+
AnyKey {
71+
kind: KeyKind::Other { py_hash: lhs_hash },
72+
obj: lhs_obj,
73+
},
74+
AnyKey {
75+
kind: KeyKind::Other { py_hash: rhs_hash },
76+
obj: rhs_obj,
77+
},
78+
) => {
5379
*lhs_hash == *rhs_hash
5480
&& Python::with_gil(|py| {
55-
let lhs = lhs.to_object(py).into_bound(py);
56-
let rhs = rhs.to_object(py).into_bound(py);
81+
let lhs = lhs_obj.to_object(py).into_bound(py);
82+
let rhs = rhs_obj.to_object(py).into_bound(py);
5783
match lhs.rich_compare(rhs, CompareOp::Eq) {
5884
Ok(v) => v.is_truthy().unwrap_or_default(),
5985
Err(_) => false,
6086
}
6187
})
6288
}
63-
6489
_ => false,
6590
}
6691
}
@@ -70,39 +95,67 @@ impl Eq for AnyKey {}
7095
impl Hash for AnyKey {
7196
#[inline]
7297
fn hash<H: Hasher>(&self, state: &mut H) {
73-
match self {
74-
AnyKey::ShortStr(s) => s.hash(state),
75-
AnyKey::Other(_, hash) => hash.hash(state),
98+
match &self.kind {
99+
KeyKind::ShortStr(s) => s.hash(state),
100+
KeyKind::Other { py_hash } => py_hash.hash(state),
76101
}
77102
}
78103
}
79104

105+
#[inline]
106+
fn cause_to_str(cause: RemovalCause) -> &'static str {
107+
match cause {
108+
RemovalCause::Expired => "expired",
109+
RemovalCause::Explicit => "explicit",
110+
RemovalCause::Replaced => "replaced",
111+
RemovalCause::Size => "size",
112+
}
113+
}
114+
80115
#[pyclass]
81116
struct Moka(Arc<Cache<AnyKey, Arc<PyObject>, ahash::RandomState>>);
82117

83118
#[pymethods]
84119
impl Moka {
85120
#[new]
86-
#[pyo3(signature = (capacity, ttl=None, tti=None))]
87-
fn new(capacity: u64, ttl: Option<f64>, tti: Option<f64>) -> PyResult<Self> {
121+
#[pyo3(signature = (capacity, ttl=None, tti=None, eviction_listener=None))]
122+
fn new(
123+
capacity: u64,
124+
ttl: Option<f64>,
125+
tti: Option<f64>,
126+
eviction_listener: Option<PyObject>,
127+
) -> PyResult<Self> {
88128
let mut builder = Cache::builder().max_capacity(capacity);
89129

90130
if let Some(ttl) = ttl {
91-
let ttl_micros = (ttl * 1000_000.0) as u64;
131+
let ttl_micros = (ttl * 1_000_000.0) as u64;
92132
if ttl_micros == 0 {
93133
return Err(PyValueError::new_err("ttl must be positive"));
94134
}
95135
builder = builder.time_to_live(Duration::from_micros(ttl_micros));
96136
}
97137

98138
if let Some(tti) = tti {
99-
let tti_micros = (tti * 1000_000.0) as u64;
139+
let tti_micros = (tti * 1_000_000.0) as u64;
100140
if tti_micros == 0 {
101141
return Err(PyValueError::new_err("tti must be positive"));
102142
}
103143
builder = builder.time_to_idle(Duration::from_micros(tti_micros));
104144
}
105145

146+
if let Some(listener) = eviction_listener {
147+
let listen_fn = move |k: Arc<AnyKey>, v: Arc<PyObject>, cause: RemovalCause| {
148+
Python::with_gil(|py| {
149+
let key = k.as_ref().obj.clone_ref(py);
150+
let value = v.as_ref().clone_ref(py);
151+
if let Err(e) = listener.call1(py, (key, value, cause_to_str(cause))) {
152+
e.restore(py)
153+
}
154+
});
155+
};
156+
builder = builder.eviction_listener(Box::new(listen_fn));
157+
}
158+
106159
Ok(Moka(Arc::new(
107160
builder.build_with_hasher(ahash::RandomState::default()),
108161
)))

tests/test_moka.py

+47-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
2-
from time import monotonic, sleep
32
import threading
3+
from concurrent.futures import ThreadPoolExecutor
4+
from time import monotonic, sleep, perf_counter
45

56
import moka_py
67

@@ -114,3 +115,48 @@ def target_2():
114115

115116
t1.join()
116117
t2.join()
118+
119+
120+
def test_eviction_listener():
121+
evicted = []
122+
123+
def listener(k, v, cause):
124+
evicted.append((k, v, cause))
125+
126+
moka = moka_py.Moka(3, eviction_listener=listener, ttl=0.1)
127+
moka.set("hello", "world")
128+
moka.set("hello", "REPLACED")
129+
moka.remove("hello")
130+
moka.set("foo", "bar")
131+
for i in range(10):
132+
moka.set(f"out-of-size-{i}", 123)
133+
sleep(1)
134+
assert moka.get("foo") is None
135+
assert {cause for _, _, cause in evicted} == {"size", "explicit", "expired", "replaced"}
136+
137+
138+
def test_eviction_listener_io():
139+
pool = ThreadPoolExecutor()
140+
evicted = []
141+
ev = threading.Event()
142+
143+
def slow_io(k, v, cause):
144+
sleep(1)
145+
evicted.append((k, v, cause))
146+
ev.set()
147+
148+
def listener(k, v, cause):
149+
pool.submit(slow_io, k, v, cause)
150+
151+
moka = moka_py.Moka(3, eviction_listener=listener, ttl=0.1)
152+
moka.set("hello", "world")
153+
sleep(0.5)
154+
155+
start = perf_counter()
156+
moka.get("hello")
157+
duration = perf_counter() - start
158+
assert duration < 1.0
159+
160+
# wait until the thread pool add the message
161+
ev.wait(2.0)
162+
assert len(evicted) == 1

0 commit comments

Comments
 (0)