Skip to content

Commit 1c85100

Browse files
authored
Merge pull request #149 from tilezen/remove_my_postgres_connection_pools
Remove my postgres connection pools
2 parents 13892fc + b6f9d88 commit 1c85100

File tree

3 files changed

+41
-65
lines changed

3 files changed

+41
-65
lines changed

tilequeue/command.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ def tilequeue_process(cfg, peripherals):
533533
io_pool = ThreadPool(n_io_workers)
534534

535535
feature_fetcher = DataFetcher(cfg.postgresql_conn_info, all_layer_data,
536-
io_pool, n_layers, n_total_needed_query)
536+
io_pool, n_layers)
537537

538538
# create all queues used to manage pipeline
539539

tilequeue/postgresql.py

Lines changed: 36 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,44 @@
11
from itertools import cycle
2-
from psycopg2.pool import ThreadedConnectionPool
32
from psycopg2.extras import register_hstore, register_json
3+
import psycopg2
44
import threading
5-
import ujson as json
6-
7-
8-
class DatabaseCycleConnectionPool(object):
9-
10-
"""
11-
Maintains a psycopg2 ThreadedConnectionPool for each of the
12-
given dbnames. When a client requests a set of connections,
13-
all of those connections will come from the same database.
14-
"""
15-
16-
def __init__(self, min_conns_per_db, max_conns_per_db, dbnames, conn_info):
17-
self._pools = []
18-
self._conns_to_pool = {}
19-
20-
for dbname in dbnames:
21-
pool = ThreadedConnectionPool(
22-
min_conns_per_db,
23-
max_conns_per_db,
24-
dbname=dbname,
25-
**conn_info
26-
)
27-
self._pools.append(pool)
28-
29-
self._pool_cycle = cycle(self._pools)
30-
self._lock = threading.Lock()
31-
32-
def get_conns(self, n_conns):
33-
conns = []
34-
35-
try:
36-
with self._lock:
37-
pool_to_use = next(self._pool_cycle)
38-
for _ in range(n_conns):
39-
conn = pool_to_use.getconn()
40-
self._conns_to_pool[id(conn)] = pool_to_use
41-
conns.append(conn)
42-
43-
conn.set_session(readonly=True, autocommit=True)
44-
register_json(conn, loads=json.loads, globally=True)
45-
register_hstore(conn, globally=True)
46-
assert len(conns) == n_conns, \
47-
"Couldn't collect enough connections"
48-
except:
49-
if conns:
50-
self.put_conns(conns)
51-
conns = []
52-
raise
53-
5+
import ujson
6+
7+
8+
class DBAffinityConnectionsNoLimit(object):
9+
10+
# Similar to the db affinity pool, but without keeping track of
11+
# the connections. It's the caller's responsibility to call us
12+
# back with the connection objects so that we can close them.
13+
14+
def __init__(self, dbnames, conn_info):
15+
self.dbnames = cycle(dbnames)
16+
self.conn_info = conn_info
17+
self.conn_mapping = {}
18+
self.lock = threading.Lock()
19+
20+
def _make_conn(self, conn_info):
21+
conn = psycopg2.connect(**conn_info)
22+
conn.set_session(readonly=True, autocommit=True)
23+
register_hstore(conn)
24+
register_json(conn, loads=ujson.loads)
25+
return conn
26+
27+
def get_conns(self, n_conn):
28+
with self.lock:
29+
dbname = self.dbnames.next()
30+
conn_info_with_db = dict(self.conn_info, dbname=dbname)
31+
conns = [self._make_conn(conn_info_with_db)
32+
for i in range(n_conn)]
5433
return conns
5534

5635
def put_conns(self, conns):
57-
with self._lock:
58-
for conn in conns:
59-
pool = self._conns_to_pool.pop(id(conn), None)
60-
assert pool is not None, \
61-
"Couldn't find the pool for connection"
62-
pool.putconn(conn)
36+
for conn in conns:
37+
try:
38+
conn.close()
39+
except:
40+
pass
6341

6442
def closeall(self):
65-
with self._lock:
66-
for pool in self._pools:
67-
pool.closeall()
68-
self._conns_to_pool.clear()
43+
raise Exception('DBAffinityConnectionsNoLimit pool does not track '
44+
'connections')

tilequeue/query.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from psycopg2.extras import RealDictCursor
2-
from tilequeue.postgresql import DatabaseCycleConnectionPool
2+
from tilequeue.postgresql import DBAffinityConnectionsNoLimit
33
from tilequeue.tile import calc_meters_per_pixel_dim
44
from tilequeue.tile import coord_to_mercator_bounds
55
from tilequeue.transform import calculate_padded_bounds
@@ -150,15 +150,15 @@ def enqueue_queries(sql_conns, thread_pool, layer_data, zoom, unpadded_bounds):
150150

151151
class DataFetcher(object):
152152

153-
def __init__(self, conn_info, layer_data, io_pool, n_conn, max_conn):
153+
def __init__(self, conn_info, layer_data, io_pool, n_conn):
154154
self.conn_info = dict(conn_info)
155155
self.layer_data = layer_data
156156
self.io_pool = io_pool
157157

158158
self.dbnames = self.conn_info.pop('dbnames')
159159
self.dbnames_query_index = 0
160-
self.sql_conn_pool = DatabaseCycleConnectionPool(
161-
n_conn, max_conn, self.dbnames, self.conn_info)
160+
self.sql_conn_pool = DBAffinityConnectionsNoLimit(
161+
self.dbnames, self.conn_info)
162162
self.n_conn = n_conn
163163

164164
def __call__(self, coord, layer_data=None):

0 commit comments

Comments
 (0)