33namespace Amp \Postgres ;
44
55use Amp \Coroutine ;
6- use Amp \Loop ;
76use Amp \Promise ;
87use Amp \Sql \Common \ConnectionPool ;
98use Amp \Sql \Common \StatementPool as SqlStatementPool ;
1312use Amp \Sql \ResultSet as SqlResultSet ;
1413use Amp \Sql \Statement as SqlStatement ;
1514use Amp \Sql \Transaction as SqlTransaction ;
16- use cash \LRUCache ;
1715use function Amp \call ;
1816
1917final class Pool extends ConnectionPool implements Link
@@ -27,12 +25,6 @@ final class Pool extends ConnectionPool implements Link
2725 /** @var bool */
2826 private $ resetConnections ;
2927
30- /** @var string */
31- private $ statementWatcher ;
32-
33- /** @var LRUCache|\IteratorAggregate Least-recently-used cache of StatementPool objects. */
34- private $ statements ;
35-
3628 /**
3729 * @param ConnectionConfig $config
3830 * @param int $maxConnections
@@ -50,45 +42,6 @@ public function __construct(
5042 parent ::__construct ($ config , $ maxConnections , $ idleTimeout , $ connector );
5143
5244 $ this ->resetConnections = $ resetConnections ;
53-
54- $ this ->statements = $ statements = new class ($ maxConnections ) extends LRUCache implements \IteratorAggregate {
55- public function getIterator (): \Iterator
56- {
57- yield from $ this ->data ;
58- }
59- };
60-
61- $ this ->statementWatcher = Loop::repeat (1000 , static function () use (&$ idleTimeout , $ statements ) {
62- $ now = \time ();
63-
64- foreach ($ statements as $ sql => $ statement ) {
65- if ($ statement instanceof Promise) {
66- continue ;
67- }
68-
69- \assert ($ statement instanceof StatementPool);
70-
71- if ($ statement ->getLastUsedAt () + $ idleTimeout > $ now ) {
72- return ;
73- }
74-
75- $ statements ->remove ($ sql );
76- }
77- });
78-
79- Loop::unreference ($ this ->statementWatcher );
80- }
81-
82- public function __destruct ()
83- {
84- parent ::__destruct ();
85- Loop::cancel ($ this ->statementWatcher );
86- }
87-
88- public function close ()
89- {
90- parent ::close ();
91- $ this ->statements ->clear ();
9245 }
9346
9447 /**
@@ -133,63 +86,6 @@ protected function pop(): \Generator
13386 return $ connection ;
13487 }
13588
136- /**
137- * {@inheritdoc}
138- *
139- * Caches prepared statements for reuse.
140- */
141- public function prepare (string $ sql ): Promise
142- {
143- if (!$ this ->isAlive ()) {
144- throw new \Error ("The pool has been closed " );
145- }
146-
147- return call (function () use ($ sql ) {
148- $ name = Handle::STATEMENT_NAME_PREFIX . \sha1 ($ sql );
149-
150- if ($ this ->statements ->containsKey ($ name )) {
151- $ statement = $ this ->statements ->get ($ name );
152-
153- if ($ statement instanceof Promise) {
154- $ statement = yield $ statement ; // Wait for prior request to resolve.
155- }
156-
157- \assert ($ statement instanceof StatementPool);
158-
159- if ($ statement ->isAlive ()) {
160- return $ statement ;
161- }
162- }
163-
164- $ promise = parent ::prepare ($ sql );
165- $ this ->statements ->put ($ name , $ promise ); // Insert promise into queue so subsequent requests get promise.
166-
167- try {
168- $ statement = yield $ promise ;
169- \assert ($ statement instanceof StatementPool);
170- } catch (\Throwable $ exception ) {
171- $ this ->statements ->remove ($ name );
172- throw $ exception ;
173- }
174-
175- $ this ->statements ->put ($ name , $ statement ); // Replace promise in queue with statement object.
176-
177- return $ statement ;
178- });
179- }
180-
181- /**
182- * {@inheritdoc}
183- */
184- public function execute (string $ sql , array $ params = []): Promise
185- {
186- return call (function () use ($ sql , $ params ) {
187- $ statement = yield $ this ->prepare ($ sql );
188- \assert ($ statement instanceof SqlStatement);
189- return yield $ statement ->execute ($ params );
190- });
191- }
192-
19389 /**
19490 * {@inheritdoc}
19591 */
0 commit comments