Skip to content

Commit 5ce46a8

Browse files
committed
feat(pool): add connection pooling support
- Introduce ConnectionPool class for managing SQLite connections. - Update SQLiteVecClient to support optional connection pooling. - Add example demonstrating connection pooling usage. - Implement comprehensive tests for connection pooling functionality.
1 parent 809570a commit 5ce46a8

File tree

8 files changed

+340
-9
lines changed

8 files changed

+340
-9
lines changed

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [2.1.0] - 2025-01-31
9+
10+
### Added
11+
- Connection pooling support via `ConnectionPool` class
12+
- Thread-safe connection reuse for concurrent access scenarios
13+
- Optional `pool` parameter in `SQLiteVecClient` constructor
14+
- `examples/connection_pool_example.py` demonstrating pooled connections
15+
- Comprehensive tests for connection pooling in `tests/test_pool.py`
16+
17+
### Improved
18+
- Better resource management for multi-threaded applications
19+
- Connection reuse reduces overhead in high-concurrency scenarios
20+
- Backward compatible - pooling is optional
21+
822
## [2.0.0] - 2025-01-30
923

1024
### Changed

TODO

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
## 🔵 Low Priority (New Features)
6363

6464
### Performance
65-
- [ ] Connection pooling support
65+
- [x] Connection pooling support
6666
- [x] Batch update operation
6767
- [x] Lazy loading option
6868
- [ ] Index strategy documentation
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""Example demonstrating connection pooling for concurrent access."""
2+
3+
import concurrent.futures
4+
import time
5+
6+
from sqlite_vec_client import ConnectionPool, SQLiteVecClient
7+
8+
9+
def worker_task(worker_id: int, pool: ConnectionPool) -> tuple[int, float]:
10+
"""Simulate a worker performing database operations.
11+
12+
Args:
13+
worker_id: Worker identifier
14+
pool: Connection pool to use
15+
16+
Returns:
17+
Tuple of (worker_id, execution_time)
18+
"""
19+
start = time.time()
20+
21+
# Create client with pooled connection (no db_path needed)
22+
client = SQLiteVecClient(table=f"docs_{worker_id}", pool=pool)
23+
24+
# Create table if needed
25+
client.create_table(dim=384)
26+
27+
# Add some data
28+
texts = [f"Document {i} from worker {worker_id}" for i in range(10)]
29+
embeddings = [[0.1 * i] * 384 for i in range(10)]
30+
client.add(texts, embeddings)
31+
32+
# Perform similarity search
33+
query = [0.5] * 384
34+
_ = client.similarity_search(query, top_k=5)
35+
36+
# Close (returns connection to pool)
37+
client.close()
38+
39+
elapsed = time.time() - start
40+
return worker_id, elapsed
41+
42+
43+
def main() -> None:
44+
"""Demonstrate connection pooling with concurrent workers."""
45+
print("Connection Pooling Example")
46+
print("=" * 50)
47+
48+
# Create connection pool
49+
pool = ConnectionPool(
50+
connection_factory=lambda: SQLiteVecClient.create_connection("./pooled.db"),
51+
pool_size=5,
52+
)
53+
54+
print("Created connection pool with size=5\n")
55+
56+
# Run concurrent workers
57+
num_workers = 10
58+
print(f"Running {num_workers} concurrent workers...")
59+
60+
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
61+
futures = [executor.submit(worker_task, i, pool) for i in range(num_workers)]
62+
results = [f.result() for f in concurrent.futures.as_completed(futures)]
63+
64+
# Print results
65+
print("\nWorker execution times:")
66+
for worker_id, elapsed in sorted(results):
67+
print(f" Worker {worker_id}: {elapsed:.3f}s")
68+
69+
avg_time = sum(t for _, t in results) / len(results)
70+
print(f"\nAverage execution time: {avg_time:.3f}s")
71+
72+
# Cleanup
73+
pool.close_all()
74+
print("\nPool closed successfully")
75+
76+
77+
if __name__ == "__main__":
78+
main()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "sqlite-vec-client"
7-
version = "2.0.0"
7+
version = "2.1.0"
88
description = "A tiny Python client around sqlite-vec for CRUD and similarity search."
99
readme = "README.md"
1010
requires-python = ">=3.9"

sqlite_vec_client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
VecClientError,
1414
)
1515
from .logger import get_logger
16+
from .pool import ConnectionPool
1617

1718
__all__ = [
1819
"SQLiteVecClient",
20+
"ConnectionPool",
1921
"VecClientError",
2022
"ValidationError",
2123
"TableNameError",

sqlite_vec_client/base.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from collections.abc import Generator
1313
from contextlib import contextmanager
1414
from types import TracebackType
15-
from typing import Any, Literal
15+
from typing import TYPE_CHECKING, Any, Literal
1616

1717
import sqlite_vec
1818

@@ -29,6 +29,9 @@
2929
validate_top_k,
3030
)
3131

32+
if TYPE_CHECKING:
33+
from .pool import ConnectionPool
34+
3235
logger = get_logger()
3336

3437

@@ -91,22 +94,34 @@ def rows_to_results(rows: list[sqlite3.Row]) -> list[Result]:
9194
for row in rows
9295
]
9396

94-
def __init__(self, table: str, db_path: str) -> None:
97+
def __init__(
98+
self, table: str, db_path: str | None = None, pool: ConnectionPool | None = None
99+
) -> None:
95100
"""Initialize the client for a given base table and database file.
96101
97102
Args:
98103
table: Name of the base table
99-
db_path: Path to SQLite database file
104+
db_path: Path to SQLite database file (required if pool is None)
105+
pool: Optional connection pool for connection reuse
100106
101107
Raises:
102108
TableNameError: If table name is invalid
103109
VecConnectionError: If connection fails
110+
ValueError: If both db_path and pool are None
104111
"""
105112
validate_table_name(table)
106113
self.table = table
107114
self._in_transaction = False
115+
self._pool = pool
108116
logger.debug(f"Initializing SQLiteVecClient for table: {table}")
109-
self.connection = self.create_connection(db_path)
117+
118+
if pool:
119+
self.connection = pool.get_connection()
120+
logger.debug("Using connection from pool")
121+
elif db_path:
122+
self.connection = self.create_connection(db_path)
123+
else:
124+
raise ValueError("Either db_path or pool must be provided")
110125

111126
def __enter__(self) -> SQLiteVecClient:
112127
"""Support context manager protocol and return `self`."""
@@ -518,10 +533,14 @@ def transaction(self) -> Generator[None, None, None]:
518533
self._in_transaction = False
519534

520535
def close(self) -> None:
521-
"""Close the underlying SQLite connection, suppressing close errors."""
536+
"""Close or return the connection to pool, suppressing close errors."""
522537
try:
523538
logger.debug(f"Closing connection for table '{self.table}'")
524-
self.connection.close()
525-
logger.info(f"Connection closed for table '{self.table}'")
539+
if self._pool:
540+
self._pool.return_connection(self.connection)
541+
logger.info(f"Connection returned to pool for table '{self.table}'")
542+
else:
543+
self.connection.close()
544+
logger.info(f"Connection closed for table '{self.table}'")
526545
except Exception as e:
527546
logger.warning(f"Error closing connection: {e}")

sqlite_vec_client/pool.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Connection pooling for SQLite connections."""
2+
3+
from __future__ import annotations
4+
5+
import sqlite3
6+
import threading
7+
from typing import Callable
8+
9+
from .logger import get_logger
10+
11+
logger = get_logger()
12+
13+
14+
class ConnectionPool:
15+
"""Thread-local connection pool for SQLite databases.
16+
17+
Note: SQLite connections are not thread-safe. This pool creates
18+
one connection per thread and reuses it within that thread.
19+
"""
20+
21+
def __init__(
22+
self,
23+
connection_factory: Callable[[], sqlite3.Connection],
24+
pool_size: int = 5,
25+
) -> None:
26+
"""Initialize connection pool.
27+
28+
Args:
29+
connection_factory: Function that creates new connections
30+
pool_size: Maximum number of connections (one per thread)
31+
"""
32+
if pool_size < 1:
33+
raise ValueError("pool_size must be at least 1")
34+
35+
self._factory = connection_factory
36+
self._pool_size = pool_size
37+
self._local = threading.local()
38+
self._lock = threading.Lock()
39+
self._created = 0
40+
self._all_connections: list[sqlite3.Connection] = []
41+
logger.debug(f"Initialized connection pool with size={pool_size}")
42+
43+
def get_connection(self) -> sqlite3.Connection:
44+
"""Get a connection for the current thread.
45+
46+
Returns:
47+
SQLite connection for this thread
48+
"""
49+
if hasattr(self._local, "connection"):
50+
conn: sqlite3.Connection = self._local.connection
51+
logger.debug("Reused thread-local connection")
52+
return conn
53+
54+
with self._lock:
55+
if self._created >= self._pool_size:
56+
raise RuntimeError(f"Connection pool exhausted (max={self._pool_size})")
57+
conn = self._factory()
58+
self._created += 1
59+
self._all_connections.append(conn)
60+
self._local.connection = conn
61+
logger.debug(f"Created new connection ({self._created}/{self._pool_size})")
62+
return conn
63+
64+
def return_connection(self, conn: sqlite3.Connection) -> None:
65+
"""Return a connection (no-op for thread-local pool).
66+
67+
Args:
68+
conn: Connection to return (kept in thread-local storage)
69+
"""
70+
logger.debug("Connection kept in thread-local storage")
71+
72+
def close_all(self) -> None:
73+
"""Close all connections in the pool."""
74+
logger.debug("Closing all connections in pool")
75+
with self._lock:
76+
for conn in self._all_connections:
77+
try:
78+
conn.close()
79+
except Exception as e:
80+
logger.warning(f"Error closing connection: {e}")
81+
closed = len(self._all_connections)
82+
self._all_connections.clear()
83+
self._created = 0
84+
logger.info(f"Closed {closed} connections from pool")

0 commit comments

Comments
 (0)