From 3a0356f5237376c97c72e58055609db6ea48a7b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C5=82eczek?= Date: Tue, 5 May 2026 06:26:38 +0200 Subject: [PATCH] fix: Start listening after schema cache load This change ensures PostgREST starts listening on a server socket only after it loaded the schema cache and is ready to handle requests. It is no longer going to return 503 errors during startup until the schema cache is loaded. --- CHANGELOG.md | 1 + src/PostgREST/Admin.hs | 16 ++++---- src/PostgREST/App.hs | 67 +++++++++++++++++++-------------- src/PostgREST/AppState.hs | 23 ++++++++--- src/PostgREST/Config.hs | 4 +- src/PostgREST/SchemaCache.hs | 9 +++-- test/io/test_io.py | 40 ++++++++++++++------ test/observability/ObsHelper.hs | 3 +- test/spec/SpecHelper.hs | 3 +- 9 files changed, 106 insertions(+), 60 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 062e6194a5..a60d62e213 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ All notable changes to this project will be documented in this file. From versio - Remove automatic transaction retries on `40001 (serialization_failure)` errors to prevent replication lag by @laurenceisla in #3673 - Fix unexpected results when embedding and filtering the same table more than once by @laurenceisla in #4075 - If the schema cache fails to reload, PostgREST will no longer stop serving requests and will continue doing so in a "best effort" basis by @mkleczek in #4873 #4869 +- Stop reporting 503s errors unnecessarily while the schema cache is loading at startup by @mkleczek in #4880 ### Changed diff --git a/src/PostgREST/Admin.hs b/src/PostgREST/Admin.hs index 99733a6995..f8501417be 100644 --- a/src/PostgREST/Admin.hs +++ b/src/PostgREST/Admin.hs @@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState import qualified Network.Socket as NS import Protolude -runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO () -runAdmin appState maybeAdminSocket socketREST settings = do +runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO () +runAdmin appState maybeAdminSocket getSocketREST settings = do whenJust maybeAdminSocket $ \adminSocket -> do address <- resolveSocketToAddress adminSocket observer $ AdminStartObs address void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp where - adminApp = admin appState socketREST + adminApp = admin appState getSocketREST observer = AppState.getObserver appState -- | PostgREST admin application -admin :: AppState.AppState -> NS.Socket -> Wai.Application -admin appState socketREST req respond = do - isMainAppReachable <- isRight <$> reachMainApp socketREST +admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application +admin appState getSocketREST req respond = do + isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp) isLoaded <- AppState.isLoaded appState isPending <- AppState.isPending appState @@ -44,8 +44,8 @@ admin appState socketREST req respond = do respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty ["ready"] -> let - status | not isMainAppReachable = HTTP.status500 - | isPending = HTTP.status503 + status | isPending = HTTP.status503 + | not isMainAppReachable = HTTP.status500 | isLoaded = HTTP.status200 | otherwise = HTTP.status500 in diff --git a/src/PostgREST/App.hs b/src/PostgREST/App.hs index da2ce9ac1a..3244a241b7 100644 --- a/src/PostgREST/App.hs +++ b/src/PostgREST/App.hs @@ -26,7 +26,9 @@ import System.IO.Error (ioeGetErrorType) import Control.Monad.Except (liftEither) import Control.Monad.Extra (whenJust) import Data.Either.Combinators (mapLeft, whenLeft) -import Data.String (IsString (..)) +import Data.IORef (atomicWriteIORef, newIORef, + readIORef) +import Data.String (IsString (..), String) import Network.Wai.Handler.Warp (defaultSettings, setHost, setOnException, setPort, setServerName) @@ -71,28 +73,38 @@ import qualified Network.HTTP.Types as HTTP import Network.HTTP.Types.Header (hVary) import qualified Network.Socket as NS import PostgREST.Unix (createAndBindDomainSocket) -import Protolude hiding (Handler) +import System.Posix.Types (FileMode) + +import Protolude hiding (Handler) run :: AppState -> IO () run appState = do conf <- AppState.getConfig appState - AppState.schemaCacheLoader appState -- Loads the initial SchemaCache - (mainSocket, adminSocket) <- initSockets conf + mainSocketRef <- newIORef Nothing + adminSocket <- initAdminServerSocket conf + let closeSockets = do whenJust adminSocket NS.close - NS.close mainSocket + readIORef mainSocketRef >>= foldMap NS.close Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState) + Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf) + Listener.runListener appState - Admin.runAdmin appState adminSocket mainSocket (serverSettings conf) + -- Kick off and wait for the initial SchemaCache load before creating the + -- main API socket. + AppState.schemaCacheLoader appState + AppState.waitForSchemaCacheInit appState + + mainSocket <- initServerSocket conf + atomicWriteIORef mainSocketRef $ Just mainSocket let app = postgrest appState (AppState.schemaCacheLoader appState) - do - address <- resolveSocketToAddress mainSocket - observer $ AppServerAddressObs address + address <- resolveSocketToAddress mainSocket + observer $ AppServerAddressObs address Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app where @@ -255,24 +267,23 @@ addRetryHint delay response = do isServiceUnavailable :: Wai.Response -> Bool isServiceUnavailable response = Wai.responseStatus response == HTTP.status503 -type AppSockets = (NS.Socket, Maybe NS.Socket) - -initSockets :: AppConfig -> IO AppSockets -initSockets AppConfig{..} = do - sock <- case configServerUnixSocket of +initSocket :: (Applicative f, Traversable f) => Maybe String -> FileMode -> Text -> f Int -> IO (f NS.Socket) +initSocket unixSocket unixSocketMode tcpHost tcpPort = + maybe initTCPSocket initDomainSocket unixSocket + where + initTCPSocket = traverse (`bindPortTCP` (fromString $ T.unpack tcpHost)) tcpPort -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows, -- but we need to have runtime error if we try to use it in Windows, not compile time error - Just path -> createAndBindDomainSocket path configServerUnixSocketMode - Nothing -> bindPortTCP configServerPort (fromString $ T.unpack configServerHost) - - adminSock <- case configAdminServerUnixSocket of - Just path -> do - adminSock <- createAndBindDomainSocket path configAdminServerUnixSocketMode - pure $ Just adminSock - Nothing -> case configAdminServerPort of - Just adminPort -> do - adminSock <- bindPortTCP adminPort (fromString $ T.unpack configAdminServerHost) - pure $ Just adminSock - Nothing -> pure Nothing - - pure (sock, adminSock) + initDomainSocket = fmap pure . (`createAndBindDomainSocket` unixSocketMode) + +initServerSocket :: AppConfig -> IO NS.Socket +initServerSocket AppConfig{..} = + runIdentity <$> initSocket + configServerUnixSocket configServerUnixSocketMode + configServerHost (pure configServerPort) + +initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket) +initAdminServerSocket AppConfig{..} = + initSocket + configAdminServerUnixSocket configAdminServerUnixSocketMode + configAdminServerHost configAdminServerPort diff --git a/src/PostgREST/AppState.hs b/src/PostgREST/AppState.hs index 93dff41342..adec80677b 100644 --- a/src/PostgREST/AppState.hs +++ b/src/PostgREST/AppState.hs @@ -25,6 +25,7 @@ module PostgREST.AppState , getObserver , isLoaded , isPending + , waitForSchemaCacheInit ) where import qualified Data.ByteString.Char8 as BS @@ -51,6 +52,9 @@ import Data.IORef (IORef, atomicWriteIORef, newIORef, readIORef) import Data.Time.Clock (UTCTime, getCurrentTime) +import Control.Concurrent.STM (TMVar, newEmptyTMVarIO, + putTMVar, readTMVar, + tryReadTMVar, tryTakeTMVar) import PostgREST.Auth.JwtCache (JwtCacheState, update) import PostgREST.Config (AppConfig (..), readAppConfig, @@ -98,9 +102,11 @@ data AppState = AppState } -- | Schema cache status. --- Empty means pending and full means loaded. +-- Empty means initial loading on startup, False means pending and True means loaded. +-- "Initial" state is needed so that we can wait with application socket listening +-- until after initial schema cache querying. newtype SchemaCacheStatus = SchemaCacheStatus - { getSCStatusMVar :: MVar () + { getSCStatusTMVar :: TMVar Bool } init :: AppConfig -> IO AppState @@ -368,16 +374,21 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea oneSecondInUs = 1000000 -- one second in microseconds newSchemaCacheStatus :: IO SchemaCacheStatus -newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyMVar +newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyTMVarIO markSchemaCachePending :: AppState -> IO () -markSchemaCachePending = void . tryTakeMVar . getSCStatusMVar . stateSCacheStatus +markSchemaCachePending = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` False) . getSCStatusTMVar . stateSCacheStatus markSchemaCacheLoaded :: AppState -> IO () -markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCacheStatus +markSchemaCacheLoaded = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` True) . getSCStatusTMVar . stateSCacheStatus isSchemaCacheLoaded :: AppState -> IO Bool -isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus +isSchemaCacheLoaded = atomically . (pure . fromMaybe False <=< tryReadTMVar) . getSCStatusTMVar . stateSCacheStatus + +-- | Wait for initial schema cache load to either finish or retry +-- | We wait until scStatusTMVar is not empty. +waitForSchemaCacheInit :: AppState -> IO () +waitForSchemaCacheInit = atomically . void . readTMVar . getSCStatusTMVar . stateSCacheStatus -- | Reads the in-db config and reads the config file again -- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue. diff --git a/src/PostgREST/Config.hs b/src/PostgREST/Config.hs index 2ae8ff39ba..3832088e17 100644 --- a/src/PostgREST/Config.hs +++ b/src/PostgREST/Config.hs @@ -128,7 +128,8 @@ data AppConfig = AppConfig , configAdminServerUnixSocketMode :: FileMode , configRoleSettings :: RoleSettings , configRoleIsoLvl :: RoleIsolationLvl - , configInternalSCQuerySleep :: Maybe Int32 + , configInternalSCQuerySleepFst :: Maybe Int32 + , configInternalSCQuerySleepSnd :: Maybe Int32 } data LogLevel = LogCrit | LogError | LogWarn | LogInfo | LogDebug @@ -332,6 +333,7 @@ parser optPath env dbSettings roleSettings roleIsolationLvl = <*> parseSocketFileMode "admin-server-unix-socket-mode" <*> pure roleSettings <*> pure roleIsolationLvl + <*> optInt "internal-schema-cache-query-sleep-before-queries" <*> optInt "internal-schema-cache-query-sleep" where parseErrorVerbosity :: C.Key -> C.Parser C.Config Verbosity diff --git a/src/PostgREST/SchemaCache.hs b/src/PostgREST/SchemaCache.hs index a487b70089..a598a82231 100644 --- a/src/PostgREST/SchemaCache.hs +++ b/src/PostgREST/SchemaCache.hs @@ -156,6 +156,9 @@ maxDbTablesForFuzzySearch = 500 querySchemaCache :: AppConfig -> SQL.Transaction (SchemaCache, Maybe QueryTimings) querySchemaCache conf@AppConfig{..} = do SQL.sql "set local schema ''" -- This voids the search path. The following queries need this for getting the fully qualified name(schema.name) of every db object + + for_ configInternalSCQuerySleepFst (`SQL.statement` sleepCall) -- only used for testing + tabs <- sqlTimedStmt gucTbls conf allTables keyDeps <- sqlTimedStmt gucKDeps conf allViewsKeyDependencies m2oRels <- sqlTimedStmt gucRels mempty allM2OandO2ORels @@ -166,9 +169,8 @@ querySchemaCache conf@AppConfig{..} = do tzones <- if configDbTimezoneEnabled then sqlTimedStmt gucTzones mempty timezones else pure S.empty - _ <- - let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in - for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing + + for_ configInternalSCQuerySleepSnd (`SQL.statement` sleepCall) -- only used for testing qsTime <- if isLogDebug @@ -195,6 +197,7 @@ querySchemaCache conf@AppConfig{..} = do schemas = toList configDbSchemas isLogDebug = configLogLevel == LogDebug sqlTimedStmt = sqlTimedStatement isLogDebug + sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True -- | overrides detected relationships with the computed relationships and gets the RelationshipsMap getOverrideRelationshipsMap :: [Relationship] -> [Relationship] -> RelationshipsMap diff --git a/test/io/test_io.py b/test/io/test_io.py index 1fde6332a5..e771eeef33 100644 --- a/test/io/test_io.py +++ b/test/io/test_io.py @@ -6,6 +6,7 @@ import subprocess import time import pytest +import requests from config import CONFIGSDIR, FIXTURES, SECRET from util import ( @@ -1090,7 +1091,7 @@ def test_empty_schema_cache_log_contains_jwt_role(defaultenv): env = { **defaultenv, - "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000", + "PGRST_DB_SCHEMAS": "non_existent_schema_aaaa", "PGRST_JWT_SECRET": SECRET, } headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET) @@ -1543,14 +1544,19 @@ def test_log_postgrest_host_and_port(host, defaultenv): with run( env=defaultenv, host=host, port=port, no_startup_stdout=False ) as postgrest: - output = postgrest.read_stdout(nlines=10) + output = postgrest.read_stdout(nlines=11) + # Cannot assume a particular log entry order + # Listening on a socket happens after schema querying + # but is concurrent to the schema loading process + # and migh happen before or after writing of the + # "Schema cache loaded" log entry if is_unix: - re.match(r'API server listening on "/tmp/.*\.sock"', output[2]) + match_log(output, [r".*API server listening on .*/tmp/.*\.sock"]) elif is_ipv6(host): - assert f"API server listening on [{host}]:{port}" in output[2] + match_log(output, [r".*API server listening on \[.+]:\d+"]) else: # IPv4 - assert f"API server listening on {host}:{port}" in output[2] + match_log(output, [r".*API server listening on .+:\d+"]) def test_succeed_w_role_having_superuser_settings(defaultenv): @@ -1818,17 +1824,24 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv): assert any(log_message in line for line in output) -def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv): - "Should log the 503 error message when there is an empty schema cache on startup" +def test_log_error_when_schema_cache_load_error_on_startup_to_stderr(defaultenv): + "Should log the 503 error message when there is an error loading schema cache on startup" env = { **defaultenv, - "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300", + "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP_BEFORE_QUERIES": "1000", + "PGRST_DB_SCHEMAS": "non_existent_schema_aaaa", } with run(env=env, wait_for=None) as postgrest: postgrest.wait_until_scache_starts_loading() + # First call should fail with connection refused + with pytest.raises(requests.ConnectionError): + postgrest.session.get("/projects") + + # Next call should return 503 + time.sleep(1) response = postgrest.session.get("/projects") assert response.status_code == 503 @@ -1840,7 +1853,7 @@ def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv): def test_no_double_schema_cache_reload_on_empty_schema(defaultenv): - "Should only load the schema cache once on a 503 error when there's an empty schema cache on startup" + "Should only load the schema cache once when there's an empty schema cache on startup" env = { **defaultenv, @@ -1850,12 +1863,15 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv): with run(env=env, wait_for=None) as postgrest: postgrest.wait_until_scache_starts_loading() - response = postgrest.session.get("/projects") - assert response.status_code == 503 + with pytest.raises(requests.ConnectionError): + postgrest.session.get("/projects") # Should wait enough time to load the schema cache twice to guarantee that the test is valid time.sleep(1) + response = postgrest.session.get("/projects") + assert response.status_code == 200 + response = postgrest.admin.get("/metrics") assert response.status_code == 200 assert 'pgrst_schema_cache_loads_total{status="SUCCESS"} 1.0' in response.text @@ -1937,7 +1953,7 @@ def test_schema_cache_error_observation(defaultenv): output = postgrest.read_stdout(nlines=9) assert ( "Failed to load the schema cache using db-schemas=public and db-extra-search-path=x" - in output[7] + in output[6] ) diff --git a/test/observability/ObsHelper.hs b/test/observability/ObsHelper.hs index 79da6bd11a..508060bf1a 100644 --- a/test/observability/ObsHelper.hs +++ b/test/observability/ObsHelper.hs @@ -119,7 +119,8 @@ baseCfg = let secret = encodeUtf8 "reallyreallyreallyreallyverysafe" in , configAdminServerUnixSocketMode = 432 , configRoleSettings = mempty , configRoleIsoLvl = mempty - , configInternalSCQuerySleep = Nothing + , configInternalSCQuerySleepFst = Nothing + , configInternalSCQuerySleepSnd = Nothing , configServerTimingEnabled = True } diff --git a/test/spec/SpecHelper.hs b/test/spec/SpecHelper.hs index 495c362554..78456b36df 100644 --- a/test/spec/SpecHelper.hs +++ b/test/spec/SpecHelper.hs @@ -160,7 +160,8 @@ baseCfg = let secret = encodeUtf8 "reallyreallyreallyreallyverysafe" in , configAdminServerUnixSocketMode = 432 , configRoleSettings = mempty , configRoleIsoLvl = mempty - , configInternalSCQuerySleep = Nothing + , configInternalSCQuerySleepFst = Nothing + , configInternalSCQuerySleepSnd = Nothing , configServerTimingEnabled = True }