Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ All notable changes to this project will be documented in this file. From versio
- Remove automatic transaction retries on `40001 (serialization_failure)` errors to prevent replication lag by @laurenceisla in #3673
- Fix unexpected results when embedding and filtering the same table more than once by @laurenceisla in #4075
- If the schema cache fails to reload, PostgREST will no longer stop serving requests and will continue doing so in a "best effort" basis by @mkleczek in #4873 #4869
- Stop reporting 503s errors unnecessarily while the schema cache is loading at startup by @mkleczek in #4880

### Changed

Expand Down
16 changes: 8 additions & 8 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState
import qualified Network.Socket as NS
import Protolude

runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO ()
runAdmin appState maybeAdminSocket socketREST settings = do
runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO ()
runAdmin appState maybeAdminSocket getSocketREST settings = do
whenJust maybeAdminSocket $ \adminSocket -> do
address <- resolveSocketToAddress adminSocket
observer $ AdminStartObs address
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
where
adminApp = admin appState socketREST
adminApp = admin appState getSocketREST
observer = AppState.getObserver appState

-- | PostgREST admin application
admin :: AppState.AppState -> NS.Socket -> Wai.Application
admin appState socketREST req respond = do
isMainAppReachable <- isRight <$> reachMainApp socketREST
admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application
admin appState getSocketREST req respond = do
isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp)
isLoaded <- AppState.isLoaded appState
isPending <- AppState.isPending appState

Expand All @@ -44,8 +44,8 @@ admin appState socketREST req respond = do
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty
["ready"] ->
let
status | not isMainAppReachable = HTTP.status500
| isPending = HTTP.status503
status | isPending = HTTP.status503
| not isMainAppReachable = HTTP.status500
Comment thread
steve-chavez marked this conversation as resolved.
| isLoaded = HTTP.status200
| otherwise = HTTP.status500
in
Expand Down
67 changes: 39 additions & 28 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import System.IO.Error (ioeGetErrorType)
import Control.Monad.Except (liftEither)
import Control.Monad.Extra (whenJust)
import Data.Either.Combinators (mapLeft, whenLeft)
import Data.String (IsString (..))
import Data.IORef (atomicWriteIORef, newIORef,
readIORef)
import Data.String (IsString (..), String)
import Network.Wai.Handler.Warp (defaultSettings, setHost,
setOnException, setPort,
setServerName)
Expand Down Expand Up @@ -71,28 +73,38 @@ import qualified Network.HTTP.Types as HTTP
import Network.HTTP.Types.Header (hVary)
import qualified Network.Socket as NS
import PostgREST.Unix (createAndBindDomainSocket)
import Protolude hiding (Handler)
import System.Posix.Types (FileMode)

import Protolude hiding (Handler)

run :: AppState -> IO ()
run appState = do
conf <- AppState.getConfig appState

AppState.schemaCacheLoader appState -- Loads the initial SchemaCache
(mainSocket, adminSocket) <- initSockets conf
mainSocketRef <- newIORef Nothing
adminSocket <- initAdminServerSocket conf

let closeSockets = do
whenJust adminSocket NS.close
NS.close mainSocket
readIORef mainSocketRef >>= foldMap NS.close
Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)

Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf)

Listener.runListener appState

Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
-- Kick off and wait for the initial SchemaCache load before creating the
-- main API socket.
AppState.schemaCacheLoader appState
AppState.waitForSchemaCacheInit appState

mainSocket <- initServerSocket conf
atomicWriteIORef mainSocketRef $ Just mainSocket
Comment thread
steve-chavez marked this conversation as resolved.

let app = postgrest appState (AppState.schemaCacheLoader appState)

do
address <- resolveSocketToAddress mainSocket
observer $ AppServerAddressObs address
address <- resolveSocketToAddress mainSocket
observer $ AppServerAddressObs address

Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
where
Expand Down Expand Up @@ -255,24 +267,23 @@ addRetryHint delay response = do
isServiceUnavailable :: Wai.Response -> Bool
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503

type AppSockets = (NS.Socket, Maybe NS.Socket)

initSockets :: AppConfig -> IO AppSockets
initSockets AppConfig{..} = do
sock <- case configServerUnixSocket of
initSocket :: (Applicative f, Traversable f) => Maybe String -> FileMode -> Text -> f Int -> IO (f NS.Socket)
initSocket unixSocket unixSocketMode tcpHost tcpPort =
maybe initTCPSocket initDomainSocket unixSocket
where
initTCPSocket = traverse (`bindPortTCP` (fromString $ T.unpack tcpHost)) tcpPort
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
-- but we need to have runtime error if we try to use it in Windows, not compile time error
Just path -> createAndBindDomainSocket path configServerUnixSocketMode
Nothing -> bindPortTCP configServerPort (fromString $ T.unpack configServerHost)

adminSock <- case configAdminServerUnixSocket of
Just path -> do
adminSock <- createAndBindDomainSocket path configAdminServerUnixSocketMode
pure $ Just adminSock
Nothing -> case configAdminServerPort of
Just adminPort -> do
adminSock <- bindPortTCP adminPort (fromString $ T.unpack configAdminServerHost)
pure $ Just adminSock
Nothing -> pure Nothing

pure (sock, adminSock)
initDomainSocket = fmap pure . (`createAndBindDomainSocket` unixSocketMode)

initServerSocket :: AppConfig -> IO NS.Socket
initServerSocket AppConfig{..} =
runIdentity <$> initSocket
configServerUnixSocket configServerUnixSocketMode
configServerHost (pure configServerPort)

initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket)
initAdminServerSocket AppConfig{..} =
initSocket
configAdminServerUnixSocket configAdminServerUnixSocketMode
configAdminServerHost configAdminServerPort
23 changes: 17 additions & 6 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module PostgREST.AppState
, getObserver
, isLoaded
, isPending
, waitForSchemaCacheInit
) where

import qualified Data.ByteString.Char8 as BS
Expand All @@ -51,6 +52,9 @@ import Data.IORef (IORef, atomicWriteIORef, newIORef,
readIORef)
import Data.Time.Clock (UTCTime, getCurrentTime)

import Control.Concurrent.STM (TMVar, newEmptyTMVarIO,
putTMVar, readTMVar,
tryReadTMVar, tryTakeTMVar)
import PostgREST.Auth.JwtCache (JwtCacheState, update)
import PostgREST.Config (AppConfig (..),
readAppConfig,
Expand Down Expand Up @@ -98,9 +102,11 @@ data AppState = AppState
}

-- | Schema cache status.
-- Empty means pending and full means loaded.
-- Empty means initial loading on startup, False means pending and True means loaded.
Comment thread
steve-chavez marked this conversation as resolved.
-- "Initial" state is needed so that we can wait with application socket listening
-- until after initial schema cache querying.
newtype SchemaCacheStatus = SchemaCacheStatus
{ getSCStatusMVar :: MVar ()
{ getSCStatusTMVar :: TMVar Bool
}

init :: AppConfig -> IO AppState
Expand Down Expand Up @@ -368,16 +374,21 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
oneSecondInUs = 1000000 -- one second in microseconds

newSchemaCacheStatus :: IO SchemaCacheStatus
newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyMVar
newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyTMVarIO

markSchemaCachePending :: AppState -> IO ()
markSchemaCachePending = void . tryTakeMVar . getSCStatusMVar . stateSCacheStatus
markSchemaCachePending = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` False) . getSCStatusTMVar . stateSCacheStatus

markSchemaCacheLoaded :: AppState -> IO ()
markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCacheStatus
markSchemaCacheLoaded = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` True) . getSCStatusTMVar . stateSCacheStatus

isSchemaCacheLoaded :: AppState -> IO Bool
isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus
isSchemaCacheLoaded = atomically . (pure . fromMaybe False <=< tryReadTMVar) . getSCStatusTMVar . stateSCacheStatus

-- | Wait for initial schema cache load to either finish or retry
-- | We wait until scStatusTMVar is not empty.
waitForSchemaCacheInit :: AppState -> IO ()
waitForSchemaCacheInit = atomically . void . readTMVar . getSCStatusTMVar . stateSCacheStatus

-- | Reads the in-db config and reads the config file again
-- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue.
Expand Down
4 changes: 3 additions & 1 deletion src/PostgREST/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@
, configAdminServerUnixSocketMode :: FileMode
, configRoleSettings :: RoleSettings
, configRoleIsoLvl :: RoleIsolationLvl
, configInternalSCQuerySleep :: Maybe Int32
, configInternalSCQuerySleepFst :: Maybe Int32
, configInternalSCQuerySleepSnd :: Maybe Int32

Check warning on line 132 in src/PostgREST/Config.hs

View check run for this annotation

Codecov / codecov/patch

src/PostgREST/Config.hs#L131-L132

Added lines #L131 - L132 were not covered by tests
}

data LogLevel = LogCrit | LogError | LogWarn | LogInfo | LogDebug
Expand Down Expand Up @@ -332,6 +333,7 @@
<*> parseSocketFileMode "admin-server-unix-socket-mode"
<*> pure roleSettings
<*> pure roleIsolationLvl
<*> optInt "internal-schema-cache-query-sleep-before-queries"
<*> optInt "internal-schema-cache-query-sleep"
where
parseErrorVerbosity :: C.Key -> C.Parser C.Config Verbosity
Expand Down
9 changes: 6 additions & 3 deletions src/PostgREST/SchemaCache.hs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ maxDbTablesForFuzzySearch = 500
querySchemaCache :: AppConfig -> SQL.Transaction (SchemaCache, Maybe QueryTimings)
querySchemaCache conf@AppConfig{..} = do
SQL.sql "set local schema ''" -- This voids the search path. The following queries need this for getting the fully qualified name(schema.name) of every db object

for_ configInternalSCQuerySleepFst (`SQL.statement` sleepCall) -- only used for testing

tabs <- sqlTimedStmt gucTbls conf allTables
keyDeps <- sqlTimedStmt gucKDeps conf allViewsKeyDependencies
m2oRels <- sqlTimedStmt gucRels mempty allM2OandO2ORels
Expand All @@ -166,9 +169,8 @@ querySchemaCache conf@AppConfig{..} = do
tzones <- if configDbTimezoneEnabled
then sqlTimedStmt gucTzones mempty timezones
else pure S.empty
_ <-
let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in
for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing

for_ configInternalSCQuerySleepSnd (`SQL.statement` sleepCall) -- only used for testing

qsTime <-
if isLogDebug
Expand All @@ -195,6 +197,7 @@ querySchemaCache conf@AppConfig{..} = do
schemas = toList configDbSchemas
isLogDebug = configLogLevel == LogDebug
sqlTimedStmt = sqlTimedStatement isLogDebug
sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True

-- | overrides detected relationships with the computed relationships and gets the RelationshipsMap
getOverrideRelationshipsMap :: [Relationship] -> [Relationship] -> RelationshipsMap
Expand Down
40 changes: 28 additions & 12 deletions test/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import subprocess
import time
import pytest
import requests

from config import CONFIGSDIR, FIXTURES, SECRET
from util import (
Expand Down Expand Up @@ -1090,7 +1091,7 @@ def test_empty_schema_cache_log_contains_jwt_role(defaultenv):

env = {
**defaultenv,
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000",
"PGRST_DB_SCHEMAS": "non_existent_schema_aaaa",
"PGRST_JWT_SECRET": SECRET,
}
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
Expand Down Expand Up @@ -1543,14 +1544,19 @@ def test_log_postgrest_host_and_port(host, defaultenv):
with run(
env=defaultenv, host=host, port=port, no_startup_stdout=False
) as postgrest:
output = postgrest.read_stdout(nlines=10)
output = postgrest.read_stdout(nlines=11)

# Cannot assume a particular log entry order
# Listening on a socket happens after schema querying
# but is concurrent to the schema loading process
# and migh happen before or after writing of the
# "Schema cache loaded" log entry
if is_unix:
re.match(r'API server listening on "/tmp/.*\.sock"', output[2])
match_log(output, [r".*API server listening on .*/tmp/.*\.sock"])
elif is_ipv6(host):
assert f"API server listening on [{host}]:{port}" in output[2]
match_log(output, [r".*API server listening on \[.+]:\d+"])
else: # IPv4
assert f"API server listening on {host}:{port}" in output[2]
match_log(output, [r".*API server listening on .+:\d+"])


def test_succeed_w_role_having_superuser_settings(defaultenv):
Expand Down Expand Up @@ -1818,17 +1824,24 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv):
assert any(log_message in line for line in output)


def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):
"Should log the 503 error message when there is an empty schema cache on startup"
def test_log_error_when_schema_cache_load_error_on_startup_to_stderr(defaultenv):
"Should log the 503 error message when there is an error loading schema cache on startup"

env = {
**defaultenv,
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300",
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP_BEFORE_QUERIES": "1000",
"PGRST_DB_SCHEMAS": "non_existent_schema_aaaa",
}

with run(env=env, wait_for=None) as postgrest:
postgrest.wait_until_scache_starts_loading()

# First call should fail with connection refused
with pytest.raises(requests.ConnectionError):
postgrest.session.get("/projects")

# Next call should return 503
time.sleep(1)
Comment thread
steve-chavez marked this conversation as resolved.
response = postgrest.session.get("/projects")
assert response.status_code == 503

Expand All @@ -1840,7 +1853,7 @@ def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):


def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
"Should only load the schema cache once on a 503 error when there's an empty schema cache on startup"
"Should only load the schema cache once when there's an empty schema cache on startup"

env = {
**defaultenv,
Expand All @@ -1850,12 +1863,15 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
with run(env=env, wait_for=None) as postgrest:
postgrest.wait_until_scache_starts_loading()

response = postgrest.session.get("/projects")
assert response.status_code == 503
with pytest.raises(requests.ConnectionError):
postgrest.session.get("/projects")
Comment thread
wolfgangwalther marked this conversation as resolved.

# Should wait enough time to load the schema cache twice to guarantee that the test is valid
time.sleep(1)

response = postgrest.session.get("/projects")
assert response.status_code == 200

Comment thread
steve-chavez marked this conversation as resolved.
response = postgrest.admin.get("/metrics")
assert response.status_code == 200
assert 'pgrst_schema_cache_loads_total{status="SUCCESS"} 1.0' in response.text
Expand Down Expand Up @@ -1937,7 +1953,7 @@ def test_schema_cache_error_observation(defaultenv):
output = postgrest.read_stdout(nlines=9)
assert (
"Failed to load the schema cache using db-schemas=public and db-extra-search-path=x"
in output[7]
in output[6]
)


Expand Down
3 changes: 2 additions & 1 deletion test/observability/ObsHelper.hs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ baseCfg = let secret = encodeUtf8 "reallyreallyreallyreallyverysafe" in
, configAdminServerUnixSocketMode = 432
, configRoleSettings = mempty
, configRoleIsoLvl = mempty
, configInternalSCQuerySleep = Nothing
, configInternalSCQuerySleepFst = Nothing
, configInternalSCQuerySleepSnd = Nothing
, configServerTimingEnabled = True
}

Expand Down
3 changes: 2 additions & 1 deletion test/spec/SpecHelper.hs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ baseCfg = let secret = encodeUtf8 "reallyreallyreallyreallyverysafe" in
, configAdminServerUnixSocketMode = 432
, configRoleSettings = mempty
, configRoleIsoLvl = mempty
, configInternalSCQuerySleep = Nothing
, configInternalSCQuerySleepFst = Nothing
, configInternalSCQuerySleepSnd = Nothing
, configServerTimingEnabled = True
}

Expand Down