33namespace Amp \Postgres ;
44
55use Amp \Coroutine ;
6+ use Amp \Loop ;
67use Amp \Promise ;
78use Amp \Sql \Common \ConnectionPool ;
89use Amp \Sql \Common \StatementPool as SqlStatementPool ;
1213use Amp \Sql \ResultSet as SqlResultSet ;
1314use Amp \Sql \Statement as SqlStatement ;
1415use Amp \Sql \Transaction as SqlTransaction ;
16+ use Amp \Success ;
17+ use cash \LRUCache ;
1518use function Amp \call ;
1619
1720final class Pool extends ConnectionPool implements Link
@@ -23,7 +26,13 @@ final class Pool extends ConnectionPool implements Link
2326 private $ listenerCount = 0 ;
2427
2528 /** @var bool */
26- private $ resetConnections = true ;
29+ private $ resetConnections ;
30+
31+ /** @var string */
32+ private $ statementWatcher ;
33+
34+ /** @var LRUCache|\IteratorAggregate Least-recently-used cache of StatementPool objects. */
35+ private $ statements ;
2736
2837 /**
2938 * @param ConnectionConfig $config
@@ -40,6 +49,45 @@ public function __construct(
4049 Connector $ connector = null
4150 ) {
4251 parent ::__construct ($ config , $ maxConnections , $ idleTimeout , $ connector );
52+
53+ $ this ->resetConnections = $ resetConnections ;
54+
55+ $ this ->statements = $ statements = new class ($ maxConnections ) extends LRUCache implements \IteratorAggregate {
56+ public function getIterator (): \Iterator
57+ {
58+ foreach ($ this ->data as $ key => $ data ) {
59+ yield $ key => $ data ;
60+ }
61+ }
62+ };
63+
64+ $ this ->statementWatcher = Loop::repeat (1000 , static function () use (&$ idleTimeout , $ statements ) {
65+ $ now = \time ();
66+
67+ foreach ($ statements as $ hash => $ statement ) {
68+ \assert ($ statement instanceof StatementPool);
69+
70+ if ($ statement ->getLastUsedAt () + $ idleTimeout > $ now ) {
71+ return ;
72+ }
73+
74+ $ statements ->remove ($ hash );
75+ }
76+ });
77+
78+ Loop::unreference ($ this ->statementWatcher );
79+ }
80+
81+ public function __destruct ()
82+ {
83+ parent ::__destruct ();
84+ Loop::cancel ($ this ->statementWatcher );
85+ }
86+
87+ public function close ()
88+ {
89+ parent ::close ();
90+ $ this ->statements ->clear ();
4391 }
4492
4593 /**
@@ -84,6 +132,45 @@ protected function pop(): \Generator
84132 return $ connection ;
85133 }
86134
135+ /**
136+ * {@inheritdoc}
137+ *
138+ * Caches prepared statements for reuse.
139+ */
140+ public function prepare (string $ sql ): Promise
141+ {
142+ if (!$ this ->isAlive ()) {
143+ throw new \Error ("The pool has been closed " );
144+ }
145+
146+ if ($ this ->statements ->containsKey ($ sql )) {
147+ $ statement = $ this ->statements ->get ($ sql );
148+ \assert ($ statement instanceof SqlStatement);
149+ if ($ statement ->isAlive ()) {
150+ return new Success ($ statement );
151+ }
152+ }
153+
154+ return call (function () use ($ sql ) {
155+ $ statement = yield parent ::prepare ($ sql );
156+ \assert ($ statement instanceof SqlStatement);
157+ $ this ->statements ->put ($ sql , $ statement );
158+ return $ statement ;
159+ });
160+ }
161+
162+ /**
163+ * {@inheritdoc}
164+ */
165+ public function execute (string $ sql , array $ params = []): Promise
166+ {
167+ return call (function () use ($ sql , $ params ) {
168+ $ statement = yield $ this ->prepare ($ sql );
169+ \assert ($ statement instanceof SqlStatement);
170+ return yield $ statement ->execute ($ params );
171+ });
172+ }
173+
87174 /**
88175 * {@inheritdoc}
89176 */
0 commit comments