Skip to content

Commit 18f55d3

Browse files
authored
Merge pull request #176 from opentensor/feat/thewhaleking/use-aiosqlite
DiskCachedAsyncSubstrateInterface: use aiosqlite
2 parents c660e19 + 958dd2e commit 18f55d3

File tree

4 files changed

+102
-11
lines changed

4 files changed

+102
-11
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ the same on-disk cache.
8989
As with the other two caches, this also takes `SUBSTRATE_CACHE_METHOD_SIZE` and `SUBSTRATE_RUNTIME_CACHE_SIZE` env vars.
9090

9191

92+
### ENV VARS
93+
The following environment variables are used within async-substrate-interface
94+
- NO_CACHE (default 0): if set to 1, when using the DiskCachedAsyncSubstrateInterface class, no persistent on-disk cache will be stored, instead using only in-memory cache.
95+
- CACHE_LOCATION (default `~/.cache/async-substrate-interface`): this determines the location for the cache file, if using DiskCachedAsyncSubstrateInterface
96+
- SUBSTRATE_CACHE_METHOD_SIZE (default 512): the cache size (either in-memory or on-disk) of the smaller return-size methods (see the Caching section for more info)
97+
- SUBSTRATE_RUNTIME_CACHE_SIZE (default 16): the cache size (either in-memory or on-disk) of the larger return-size methods (see the Caching section for more info)
98+
99+
92100
## Contributing
93101

94102
Contributions are welcome! Please open an issue or submit a pull request to the `staging` branch.

async_substrate_interface/async_substrate.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
from async_substrate_interface.utils.cache import (
6464
async_sql_lru_cache,
6565
cached_fetcher,
66+
AsyncSqliteDB,
6667
)
6768
from async_substrate_interface.utils.decoding import (
6869
_determine_if_old_runtime_call,
@@ -4026,6 +4027,18 @@ class DiskCachedAsyncSubstrateInterface(AsyncSubstrateInterface):
40264027
Experimental new class that uses disk-caching in addition to memory-caching for the cached methods
40274028
"""
40284029

4030+
async def close(self):
4031+
"""
4032+
Closes the substrate connection, and the websocket connection.
4033+
"""
4034+
try:
4035+
await self.ws.shutdown()
4036+
except AttributeError:
4037+
pass
4038+
db_conn = AsyncSqliteDB(self.url)
4039+
if db_conn._db is not None:
4040+
await db_conn._db.close()
4041+
40294042
@async_sql_lru_cache(maxsize=SUBSTRATE_CACHE_METHOD_SIZE)
40304043
async def get_parent_block_hash(self, block_hash):
40314044
return await self._get_parent_block_hash(block_hash)

async_substrate_interface/utils/cache.py

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
from pathlib import Path
1010
from typing import Callable, Any, Awaitable, Hashable, Optional
1111

12+
import aiosqlite
13+
14+
1215
USE_CACHE = True if os.getenv("NO_CACHE") != "1" else False
1316
CACHE_LOCATION = (
1417
os.path.expanduser(
@@ -21,6 +24,78 @@
2124
logger = logging.getLogger("async_substrate_interface")
2225

2326

27+
class AsyncSqliteDB:
28+
_instances: dict[str, "AsyncSqliteDB"] = {}
29+
_db: Optional[aiosqlite.Connection] = None
30+
_lock: Optional[asyncio.Lock] = None
31+
32+
def __new__(cls, chain_endpoint: str):
33+
try:
34+
return cls._instances[chain_endpoint]
35+
except KeyError:
36+
instance = super().__new__(cls)
37+
instance._lock = asyncio.Lock()
38+
cls._instances[chain_endpoint] = instance
39+
return instance
40+
41+
async def __call__(self, chain, other_self, func, args, kwargs) -> Optional[Any]:
42+
async with self._lock:
43+
if not self._db:
44+
_ensure_dir()
45+
self._db = await aiosqlite.connect(CACHE_LOCATION)
46+
table_name = _get_table_name(func)
47+
key = None
48+
if not (local_chain := _check_if_local(chain)) or not USE_CACHE:
49+
await self._db.execute(
50+
f"""
51+
CREATE TABLE IF NOT EXISTS {table_name}
52+
(
53+
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
54+
key BLOB,
55+
value BLOB,
56+
chain TEXT,
57+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
58+
);
59+
"""
60+
)
61+
await self._db.execute(
62+
f"""
63+
CREATE TRIGGER IF NOT EXISTS prune_rows_trigger_{table_name} AFTER INSERT ON {table_name}
64+
BEGIN
65+
DELETE FROM {table_name}
66+
WHERE rowid IN (
67+
SELECT rowid FROM {table_name}
68+
ORDER BY created_at DESC
69+
LIMIT -1 OFFSET 500
70+
);
71+
END;
72+
"""
73+
)
74+
await self._db.commit()
75+
key = pickle.dumps((args, kwargs or None))
76+
try:
77+
cursor: aiosqlite.Cursor = await self._db.execute(
78+
f"SELECT value FROM {table_name} WHERE key=? AND chain=?",
79+
(key, chain),
80+
)
81+
result = await cursor.fetchone()
82+
await cursor.close()
83+
if result is not None:
84+
return pickle.loads(result[0])
85+
except (pickle.PickleError, sqlite3.Error) as e:
86+
logger.exception("Cache error", exc_info=e)
87+
pass
88+
result = await func(other_self, *args, **kwargs)
89+
if not local_chain or not USE_CACHE:
90+
# TODO use a task here
91+
await self._db.execute(
92+
f"INSERT OR REPLACE INTO {table_name} (key, value, chain) VALUES (?,?,?)",
93+
(key, pickle.dumps(result), chain),
94+
)
95+
await self._db.commit()
96+
return result
97+
98+
2499
def _ensure_dir():
25100
path = Path(CACHE_LOCATION).parent
26101
if not path.exists():
@@ -115,7 +190,8 @@ def inner(self, *args, **kwargs):
115190
)
116191

117192
# If not in DB, call func and store in DB
118-
result = func(self, *args, **kwargs)
193+
if result is None:
194+
result = func(self, *args, **kwargs)
119195

120196
if not local_chain or not USE_CACHE:
121197
_insert_into_cache(c, conn, table_name, key, result, chain)
@@ -131,15 +207,8 @@ def async_sql_lru_cache(maxsize: Optional[int] = None):
131207
def decorator(func):
132208
@cached_fetcher(max_size=maxsize)
133209
async def inner(self, *args, **kwargs):
134-
c, conn, table_name, key, result, chain, local_chain = (
135-
_shared_inner_fn_logic(func, self, args, kwargs)
136-
)
137-
138-
# If not in DB, call func and store in DB
139-
result = await func(self, *args, **kwargs)
140-
if not local_chain or not USE_CACHE:
141-
_insert_into_cache(c, conn, table_name, key, result, chain)
142-
210+
async_sql_db = AsyncSqliteDB(self.url)
211+
result = await async_sql_db(self.url, self, func, args, kwargs)
143212
return result
144213

145214
return inner

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ dependencies = [
1111
"bt-decode==v0.6.0",
1212
"scalecodec~=1.2.11",
1313
"websockets>=14.1",
14-
"xxhash"
14+
"xxhash",
15+
"aiosqlite>=0.21.0,<1.0.0"
1516
]
1617

1718
requires-python = ">=3.9,<3.14"

0 commit comments

Comments
 (0)