1717use Revolt \EventLoop ;
1818use function Amp \async ;
1919
20- /** @internal */
20+ /**
21+ * @internal
22+ *
23+ * @psalm-type PgSqlTypeMap = array<int, PgSqlType> Map of OID to corresponding PgSqlType.
24+ */
2125final class PgSqlHandle extends AbstractHandle
2226{
27+ private const TYPE_QUERY = <<<SQL
28+ SELECT t.oid, t.typcategory, t.typname, t.typdelim, t.typelem
29+ FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON t.typnamespace=n.oid
30+ WHERE t.typisdefined AND n.nspname IN ('pg_catalog', 'public') ORDER BY t.oid
31+ SQL ;
32+
2333 private const DIAGNOSTIC_CODES = [
2434 \PGSQL_DIAG_SEVERITY => "severity " ,
2535 \PGSQL_DIAG_SQLSTATE => "sqlstate " ,
@@ -35,16 +45,16 @@ final class PgSqlHandle extends AbstractHandle
3545 \PGSQL_DIAG_SOURCE_FUNCTION => "source_function " ,
3646 ];
3747
38- /** @var array<string, array<int, PgSqlType >> */
48+ /** @var array<string, Future<PgSqlTypeMap >> */
3949 private static array $ typeCache ;
4050
4151 private static ?\Closure $ errorHandler = null ;
4252
4353 /** @var \PgSql\Connection PostgreSQL connection handle. */
4454 private ?\PgSql \Connection $ handle ;
4555
46- /** @var array<int, PgSqlType> */
47- private readonly array $ types ;
56+ /** @var PgSqlTypeMap|null */
57+ private ? array $ types = null ;
4858
4959 /** @var array<non-empty-string, StatementStorage<string>> */
5060 private array $ statements = [];
@@ -57,13 +67,11 @@ final class PgSqlHandle extends AbstractHandle
5767 public function __construct (
5868 \PgSql \Connection $ handle ,
5969 $ socket ,
60- string $ id ,
70+ private readonly string $ id ,
6171 PostgresConfig $ config ,
6272 ) {
6373 $ this ->handle = $ handle ;
6474
65- $ this ->types = (self ::$ typeCache [$ id ] ??= self ::fetchTypes ($ handle ));
66-
6775 $ handle = &$ this ->handle ;
6876 $ lastUsedAt = &$ this ->lastUsedAt ;
6977 $ deferred = &$ this ->pendingOperation ;
@@ -171,35 +179,66 @@ public function __construct(
171179 }
172180
173181 /**
174- * @return array<int, PgSqlType >
182+ * @return Future<PgSqlTypeMap >
175183 */
176- private static function fetchTypes (\ PgSql \ Connection $ handle ): array
184+ private function fetchTypes (): Future
177185 {
178- $ result = \pg_query ( $ handle , " SELECT t.oid, t.typcategory, t.typname, t.typdelim, t.typelem
179- FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON t.typnamespace=n.oid
180- WHERE t.typisdefined AND n.nspname IN ('pg_catalog', 'public') ORDER BY t.oid " );
186+ if ( isset ( self :: $ typeCache [ $ this -> id ])) {
187+ return self :: $ typeCache [ $ this -> id ];
188+ }
181189
182- if ($ result === false ) {
183- throw new SqlException (\pg_last_error ($ handle ));
190+ \assert ($ this ->pendingOperation === null , 'Operation pending when fetching types! ' );
191+
192+ if ($ this ->handle === null ) {
193+ throw new \Error ("The connection to the database has been closed " );
184194 }
185195
186- $ types = [];
187- while ($ row = \pg_fetch_array ($ result , mode: \PGSQL_NUM )) {
188- [$ oid , $ typeCategory , $ typeName , $ delimiter , $ element ] = $ row ;
196+ $ result = \pg_send_query ($ this ->handle , self ::TYPE_QUERY );
197+ if ($ result === false ) {
198+ $ this ->close ();
199+ throw new SqlException (\pg_last_error ($ this ->handle ));
200+ }
189201
190- \assert (
191- \is_numeric ($ oid ) && \is_numeric ($ element ),
192- "OID and element type expected to be integers " ,
193- );
194- \assert (
195- \is_string ($ typeCategory ) && \is_string ($ typeName ) && \is_string ($ delimiter ),
196- "Unexpected types in type catalog query results " ,
197- );
202+ $ this ->pendingOperation = $ queryDeferred = new DeferredFuture ();
203+ $ typesDeferred = new DeferredFuture ();
198204
199- $ types [(int ) $ oid ] = new PgSqlType ($ typeCategory , $ typeName , $ delimiter , (int ) $ element );
205+ EventLoop::reference ($ this ->poll );
206+ if ($ result === 0 ) {
207+ EventLoop::enable ($ this ->await );
200208 }
201209
202- return $ types ;
210+ EventLoop::queue (function () use ($ queryDeferred , $ typesDeferred ): void {
211+ try {
212+ $ result = $ queryDeferred ->getFuture ()->await ();
213+ if (\pg_result_status ($ result ) !== \PGSQL_TUPLES_OK ) {
214+ throw new SqlException (\pg_result_error ($ result ));
215+ }
216+
217+ $ types = [];
218+ while ($ row = \pg_fetch_array ($ result , mode: \PGSQL_NUM )) {
219+ [$ oid , $ category , $ name , $ delimiter , $ element ] = $ row ;
220+
221+ \assert (
222+ \is_numeric ($ oid ) && \is_numeric ($ element ),
223+ "OID and element type expected to be integers " ,
224+ );
225+ \assert ( // For Psalm
226+ \is_string ($ category ) && \is_string ($ name ) && \is_string ($ delimiter ),
227+ "Unexpected nulls in type catalog query results " ,
228+ );
229+
230+ $ types [(int ) $ oid ] = new PgSqlType ($ category , $ name , $ delimiter , (int ) $ element );
231+ }
232+
233+ $ typesDeferred ->complete ($ types );
234+ } catch (\Throwable $ exception ) {
235+ $ this ->close ();
236+ $ typesDeferred ->error ($ exception );
237+ unset(self ::$ typeCache [$ this ->id ]);
238+ }
239+ });
240+
241+ return self ::$ typeCache [$ this ->id ] = $ typesDeferred ->getFuture ();
203242 }
204243
205244 private static function getErrorHandler (): \Closure
@@ -224,12 +263,12 @@ public function isClosed(): bool
224263 * @param \Closure $function Function to execute.
225264 * @param mixed ...$args Arguments to pass to function.
226265 *
227- * @return \PgSql\Result
228- *
229266 * @throws SqlException
230267 */
231268 private function send (\Closure $ function , mixed ...$ args ): mixed
232269 {
270+ $ this ->types ??= $ this ->fetchTypes ()->await ();
271+
233272 while ($ this ->pendingOperation ) {
234273 try {
235274 $ this ->pendingOperation ->getFuture ()->await ();
@@ -275,6 +314,8 @@ private function createResult(\PgSql\Result $result, string $sql): PostgresResul
275314 throw new \Error ("The connection to the database has been closed " );
276315 }
277316
317+ \assert ($ this ->types !== null , 'Expected type array to be populated before creating a result ' );
318+
278319 switch (\pg_result_status ($ result )) {
279320 case \PGSQL_EMPTY_QUERY :
280321 throw new SqlQueryError ("Empty query string " );
0 commit comments