diff --git a/graph/src/components/metrics/registry.rs b/graph/src/components/metrics/registry.rs index e010d3a89fa..93cf51b3bd1 100644 --- a/graph/src/components/metrics/registry.rs +++ b/graph/src/components/metrics/registry.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, RwLock}; use prometheus::IntGauge; use prometheus::{labels, Histogram, IntCounterVec}; -use slog::info; +use slog::debug; use crate::components::metrics::{counter_with_labels, gauge_with_labels}; use crate::prelude::Collector; @@ -133,7 +133,7 @@ impl MetricsRegistry { let mut result = self.registry.register(collector.clone()); if matches!(result, Err(PrometheusError::AlreadyReg)) { - info!(logger, "Resolving duplicate metric registration"); + debug!(logger, "Resolving duplicate metric registration"); // Since the current metric is a duplicate, // we can use it to unregister the previous registration. @@ -144,7 +144,6 @@ impl MetricsRegistry { match result { Ok(()) => { - info!(logger, "Successfully registered a new metric"); self.registered_metrics.inc(); } Err(err) => { diff --git a/graph/src/components/store/err.rs b/graph/src/components/store/err.rs index 6af676f8e52..76be7c311ce 100644 --- a/graph/src/components/store/err.rs +++ b/graph/src/components/store/err.rs @@ -141,7 +141,7 @@ impl Clone for StoreError { } impl StoreError { - fn from_diesel_error(e: &DieselError) -> Option { + pub fn from_diesel_error(e: &DieselError) -> Option { const CONN_CLOSE: &str = "server closed the connection unexpectedly"; const STMT_TIMEOUT: &str = "canceling statement due to statement timeout"; let DieselError::DatabaseError(_, info) = e else { diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 27cb3768e2c..73cb22269fe 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -655,7 +655,7 @@ pub trait QueryStore: Send + Sync { block_hash: &BlockHash, ) -> Result, Option)>, StoreError>; - fn wait_stats(&self) -> Result; + fn wait_stats(&self) -> PoolWaitStats; /// Find the current state for the subgraph deployment `id` and /// return details about it needed for executing queries @@ -668,7 +668,7 @@ pub trait QueryStore: Send + Sync { fn network_name(&self) -> &str; /// A permit should be acquired before starting query execution. - async fn query_permit(&self) -> Result; + async fn query_permit(&self) -> QueryPermit; /// Report the name of the shard in which the subgraph is stored. This /// should only be used for reporting and monitoring @@ -683,7 +683,7 @@ pub trait QueryStore: Send + Sync { #[async_trait] pub trait StatusStore: Send + Sync + 'static { /// A permit should be acquired before starting query execution. - async fn query_permit(&self) -> Result; + async fn query_permit(&self) -> QueryPermit; fn status(&self, filter: status::Filter) -> Result, StoreError>; diff --git a/graph/src/data/query/trace.rs b/graph/src/data/query/trace.rs index cf2d153dca4..256c9cdeaf6 100644 --- a/graph/src/data/query/trace.rs +++ b/graph/src/data/query/trace.rs @@ -118,11 +118,8 @@ impl Trace { } } - pub fn query_done(&mut self, dur: Duration, permit: &Result) { - let permit_dur = match permit { - Ok(permit) => permit.wait, - Err(_) => Duration::from_millis(0), - }; + pub fn query_done(&mut self, dur: Duration, permit: &QueryPermit) { + let permit_dur = permit.wait; match self { Trace::None => { /* nothing to do */ } Trace::Root { .. } => { diff --git a/graph/src/task_spawn.rs b/graph/src/task_spawn.rs index 09055ad5381..dd1477bb1c8 100644 --- a/graph/src/task_spawn.rs +++ b/graph/src/task_spawn.rs @@ -57,10 +57,11 @@ pub fn block_on(f: impl Future03) -> T { } /// Spawns a thread with access to the tokio runtime. Panics if the thread cannot be spawned. -pub fn spawn_thread( - name: impl Into, - f: impl 'static + FnOnce() + Send, -) -> std::thread::JoinHandle<()> { +pub fn spawn_thread(name: impl Into, f: F) -> std::thread::JoinHandle +where + F: 'static + FnOnce() -> R + Send, + R: 'static + Send, +{ let conf = std::thread::Builder::new().name(name.into()); let runtime = tokio::runtime::Handle::current(); conf.spawn(move || { diff --git a/graphql/src/execution/resolver.rs b/graphql/src/execution/resolver.rs index ca59e401dfc..0074eb124d8 100644 --- a/graphql/src/execution/resolver.rs +++ b/graphql/src/execution/resolver.rs @@ -18,7 +18,7 @@ use super::Query; pub trait Resolver: Sized + Send + Sync + 'static { const CACHEABLE: bool; - async fn query_permit(&self) -> Result; + async fn query_permit(&self) -> QueryPermit; /// Prepare for executing a query by prefetching as much data as possible fn prefetch( diff --git a/graphql/src/introspection/resolver.rs b/graphql/src/introspection/resolver.rs index 0f67b717c5a..765b0399695 100644 --- a/graphql/src/introspection/resolver.rs +++ b/graphql/src/introspection/resolver.rs @@ -356,7 +356,7 @@ impl Resolver for IntrospectionResolver { // see `fn as_introspection_context`, so this value is irrelevant. const CACHEABLE: bool = false; - async fn query_permit(&self) -> Result { + async fn query_permit(&self) -> QueryPermit { unreachable!() } diff --git a/graphql/src/runner.rs b/graphql/src/runner.rs index 96f30e8bc9d..d2f0bc9c96c 100644 --- a/graphql/src/runner.rs +++ b/graphql/src/runner.rs @@ -143,7 +143,7 @@ where )?; self.load_manager .decide( - &store.wait_stats().map_err(QueryExecutionError::from)?, + &store.wait_stats(), store.shard(), store.deployment_id(), query.shape_hash, diff --git a/graphql/src/store/resolver.rs b/graphql/src/store/resolver.rs index 82c40420fa6..d7032740768 100644 --- a/graphql/src/store/resolver.rs +++ b/graphql/src/store/resolver.rs @@ -256,8 +256,8 @@ impl StoreResolver { impl Resolver for StoreResolver { const CACHEABLE: bool = true; - async fn query_permit(&self) -> Result { - self.store.query_permit().await.map_err(Into::into) + async fn query_permit(&self) -> QueryPermit { + self.store.query_permit().await } fn prefetch( diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 5142a2ab939..50ee9b61958 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -898,7 +898,7 @@ impl Context { fn primary_pool(self) -> ConnectionPool { let primary = self.config.primary_store(); - let coord = Arc::new(PoolCoordinator::new(Arc::new(vec![]))); + let coord = Arc::new(PoolCoordinator::new(&self.logger, Arc::new(vec![]))); let pool = StoreBuilder::main_pool( &self.logger, &self.node_id, diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index 7fadf6b92c2..5294179f8eb 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -1,15 +1,15 @@ use std::iter::FromIterator; use std::{collections::HashMap, sync::Arc}; -use graph::futures03::future::join_all; use graph::prelude::{o, MetricsRegistry, NodeId}; +use graph::slog::warn; use graph::url::Url; use graph::{ prelude::{info, CheapClone, Logger}, util::security::SafeDisplay, }; use graph_store_postgres::connection_pool::{ - ConnectionPool, ForeignServer, PoolCoordinator, PoolName, + ConnectionPool, ForeignServer, PoolCoordinator, PoolRole, }; use graph_store_postgres::{ BlockStore as DieselBlockStore, ChainHeadUpdateListener as PostgresChainHeadUpdateListener, @@ -62,7 +62,7 @@ impl StoreBuilder { // attempt doesn't work for all of them because the database is // unavailable, they will try again later in the normal course of // using the pool - join_all(pools.values().map(|pool| pool.setup())).await; + coord.setup_all(logger).await; let chains = HashMap::from_iter(config.chains.chains.iter().map(|(name, chain)| { let shard = ShardName::new(chain.shard.to_string()) @@ -111,13 +111,28 @@ impl StoreBuilder { .collect::, _>>() .expect("connection url's contain enough detail"); let servers = Arc::new(servers); - let coord = Arc::new(PoolCoordinator::new(servers)); + let coord = Arc::new(PoolCoordinator::new(logger, servers)); let shards: Vec<_> = config .stores .iter() - .map(|(name, shard)| { + .filter_map(|(name, shard)| { let logger = logger.new(o!("shard" => name.to_string())); + let pool_size = shard.pool_size.size_for(node, name).unwrap_or_else(|_| { + panic!("cannot determine the pool size for store {}", name) + }); + if pool_size == 0 { + if name == PRIMARY_SHARD.as_str() { + panic!("pool size for primary shard must be greater than 0"); + } else { + warn!( + logger, + "pool size for shard {} is 0, ignoring this shard", name + ); + return None; + } + } + let conn_pool = Self::main_pool( &logger, node, @@ -138,7 +153,7 @@ impl StoreBuilder { let name = ShardName::new(name.to_string()).expect("shard names have been validated"); - (name, conn_pool, read_only_conn_pools, weights) + Some((name, conn_pool, read_only_conn_pools, weights)) }) .collect(); @@ -196,8 +211,8 @@ impl StoreBuilder { Arc::new(DieselStore::new(subgraph_store, block_store)) } - /// Create a connection pool for the main database of the primary shard - /// without connecting to all the other configured databases + /// Create a connection pool for the main (non-replica) database of a + /// shard pub fn main_pool( logger: &Logger, node: &NodeId, @@ -225,7 +240,7 @@ impl StoreBuilder { coord.create_pool( &logger, name, - PoolName::Main, + PoolRole::Main, shard.connection.clone(), pool_size, Some(fdw_pool_size), @@ -265,7 +280,7 @@ impl StoreBuilder { coord.clone().create_pool( &logger, name, - PoolName::Replica(pool), + PoolRole::Replica(pool), replica.connection.clone(), pool_size, None, diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index a60e5d35fd9..7974afe41db 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -777,8 +777,8 @@ fn entity_changes_to_graphql(entity_changes: Vec) -> r::Value { impl Resolver for IndexNodeResolver { const CACHEABLE: bool = false; - async fn query_permit(&self) -> Result { - self.store.query_permit().await.map_err(Into::into) + async fn query_permit(&self) -> QueryPermit { + self.store.query_permit().await } fn prefetch( diff --git a/store/postgres/src/advisory_lock.rs b/store/postgres/src/advisory_lock.rs index bd60d34c634..85e2cf5a4ae 100644 --- a/store/postgres/src/advisory_lock.rs +++ b/store/postgres/src/advisory_lock.rs @@ -6,7 +6,7 @@ //! has more details on advisory locks. //! //! We use the following 64 bit locks: -//! * 1,2: to synchronize on migratons +//! * 1: to synchronize on migratons //! //! We use the following 2x 32-bit locks //! * 1, n: to lock copying of the deployment with id n in the destination @@ -69,17 +69,31 @@ const COPY: Scope = Scope { id: 1 }; const WRITE: Scope = Scope { id: 2 }; const PRUNE: Scope = Scope { id: 3 }; -/// Get a lock for running migrations. Blocks until we get the lock. -pub(crate) fn lock_migration(conn: &mut PgConnection) -> Result<(), StoreError> { - sql_query("select pg_advisory_lock(1)").execute(conn)?; +/// Block until we can get the migration lock, then run `f` and unlock when +/// it is done. This is used to make sure that only one node runs setup at a +/// time. +pub(crate) async fn with_migration_lock( + conn: &mut PgConnection, + f: F, +) -> Result +where + F: FnOnce(&mut PgConnection) -> Fut, + Fut: std::future::Future>, +{ + fn execute(conn: &mut PgConnection, query: &str, msg: &str) -> Result<(), StoreError> { + sql_query(query).execute(conn).map(|_| ()).map_err(|e| { + StoreError::from_diesel_error(&e) + .unwrap_or_else(|| StoreError::Unknown(anyhow::anyhow!("{}: {}", msg, e))) + }) + } - Ok(()) -} + const LOCK: &str = "select pg_advisory_lock(1)"; + const UNLOCK: &str = "select pg_advisory_unlock(1)"; -/// Release the migration lock. -pub(crate) fn unlock_migration(conn: &mut PgConnection) -> Result<(), StoreError> { - sql_query("select pg_advisory_unlock(1)").execute(conn)?; - Ok(()) + execute(conn, LOCK, "failed to acquire migration lock")?; + let res = f(conn).await; + execute(conn, UNLOCK, "failed to release migration lock")?; + res } /// Take the lock used to keep two copy operations to run simultaneously on diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 9af40b8d2a0..f69267fff17 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -319,11 +319,7 @@ impl BlockStore { } pub(crate) async fn query_permit_primary(&self) -> QueryPermit { - self.mirror - .primary() - .query_permit() - .await - .expect("the primary is never disabled") + self.mirror.primary().query_permit().await } pub fn allocate_chain( diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index ace5cddd719..6ff46649494 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -10,6 +10,9 @@ use diesel_migrations::{EmbeddedMigrations, HarnessWithOutput}; use graph::cheap_clone::CheapClone; use graph::components::store::QueryPermit; use graph::constraint_violation; +use graph::derive::CheapClone; +use graph::futures03::future::join_all; +use graph::futures03::FutureExt as _; use graph::prelude::tokio::time::Instant; use graph::prelude::{tokio, MetricsRegistry}; use graph::slog::warn; @@ -33,8 +36,9 @@ use std::{collections::HashMap, sync::RwLock}; use postgres::config::{Config, Host}; -use crate::primary::{self, Mirror, NAMESPACE_PUBLIC}; -use crate::{advisory_lock, catalog}; +use crate::advisory_lock::with_migration_lock; +use crate::catalog; +use crate::primary::{self, Mirror, Namespace, NAMESPACE_PUBLIC}; use crate::{Shard, PRIMARY_SHARD}; /// Tables that we map from the primary into `primary_public` in each shard @@ -309,7 +313,15 @@ impl ForeignServer { /// them on idle. This is much shorter than the default of 10 minutes. const FDW_IDLE_TIMEOUT: Duration = Duration::from_secs(60); -/// A pool goes through several states, and this enum tracks what state we +enum PoolStateInner { + /// A connection pool, and all the servers for which we need to + /// establish fdw mappings when we call `setup` on the pool + Created(Arc, Arc), + /// The pool has been successfully set up + Ready(Arc), +} + +/// A pool goes through several states, and this struct tracks what state we /// are in, together with the `state_tracker` field on `ConnectionPool`. /// When first created, the pool is in state `Created`; once we successfully /// called `setup` on it, it moves to state `Ready`. During use, we use the @@ -319,20 +331,95 @@ const FDW_IDLE_TIMEOUT: Duration = Duration::from_secs(60); /// database connection. That avoids overall undesirable states like buildup /// of queries; instead of queueing them until the database is available, /// they return almost immediately with an error -enum PoolState { - /// A connection pool, and all the servers for which we need to - /// establish fdw mappings when we call `setup` on the pool - Created(Arc, Arc), - /// The pool has been successfully set up - Ready(Arc), - /// The pool has been disabled by setting its size to 0 - Disabled, +#[derive(Clone, CheapClone)] +struct PoolState { + logger: Logger, + inner: Arc>, } +impl PoolState { + fn new(logger: Logger, inner: PoolStateInner, name: String) -> Self { + let pool_name = format!("pool-{}", name); + Self { + logger, + inner: Arc::new(TimedMutex::new(inner, pool_name)), + } + } + + fn created(pool: Arc, coord: Arc) -> Self { + let logger = pool.logger.clone(); + let name = pool.shard.to_string(); + let inner = PoolStateInner::Created(pool, coord); + Self::new(logger, inner, name) + } + + fn ready(pool: Arc) -> Self { + let logger = pool.logger.clone(); + let name = pool.shard.to_string(); + let inner = PoolStateInner::Ready(pool); + Self::new(logger, inner, name) + } + + fn set_ready(&self) { + use PoolStateInner::*; + + let mut guard = self.inner.lock(&self.logger); + match &*guard { + Created(pool, _) => *guard = Ready(pool.clone()), + Ready(_) => { /* nothing to do */ } + } + } + + /// Get a connection pool that is ready, i.e., has been through setup + /// and running migrations + fn get_ready(&self) -> Result, StoreError> { + // We have to be careful here that we do not hold a lock when we + // call `setup_bg`, otherwise we will deadlock + let (pool, coord) = { + let guard = self.inner.lock(&self.logger); + + use PoolStateInner::*; + match &*guard { + Created(pool, coord) => (pool.cheap_clone(), coord.cheap_clone()), + Ready(pool) => return Ok(pool.clone()), + } + }; + + // self is `Created` and needs to have setup run + coord.setup_bg(self.cheap_clone())?; + + // We just tried to set up the pool; if it is still not set up and + // we didn't have an error, it means the database is not available + if self.needs_setup() { + return Err(StoreError::DatabaseUnavailable); + } else { + Ok(pool) + } + } + + /// Get the inner pool, regardless of whether it has been set up or not. + /// Most uses should use `get_ready` instead + fn get_unready(&self) -> Arc { + use PoolStateInner::*; + + match &*self.inner.lock(&self.logger) { + Created(pool, _) | Ready(pool) => pool.cheap_clone(), + } + } + + fn needs_setup(&self) -> bool { + let guard = self.inner.lock(&self.logger); + + use PoolStateInner::*; + match &*guard { + Created(_, _) => true, + Ready(_) => false, + } + } +} #[derive(Clone)] pub struct ConnectionPool { - inner: Arc>, - logger: Logger, + inner: PoolState, pub shard: Shard, state_tracker: PoolStateTracker, } @@ -345,27 +432,27 @@ impl fmt::Debug for ConnectionPool { } } -/// The name of the pool, mostly for logging, and what purpose it serves. +/// The role of the pool, mostly for logging, and what purpose it serves. /// The main pool will always be called `main`, and can be used for reading /// and writing. Replica pools can only be used for reading, and don't /// require any setup (migrations etc.) -pub enum PoolName { +pub enum PoolRole { Main, Replica(String), } -impl PoolName { +impl PoolRole { fn as_str(&self) -> &str { match self { - PoolName::Main => "main", - PoolName::Replica(name) => name, + PoolRole::Main => "main", + PoolRole::Replica(name) => name, } } fn is_replica(&self) -> bool { match self { - PoolName::Main => false, - PoolName::Replica(_) => true, + PoolRole::Main => false, + PoolRole::Replica(_) => true, } } } @@ -414,7 +501,7 @@ impl PoolStateTracker { impl ConnectionPool { fn create( shard_name: &str, - pool_name: PoolName, + pool_name: PoolRole, postgres_url: String, pool_size: u32, fdw_pool_size: Option, @@ -425,30 +512,25 @@ impl ConnectionPool { let state_tracker = PoolStateTracker::new(); let shard = Shard::new(shard_name.to_string()).expect("shard_name is a valid name for a shard"); - let pool_state = { - if pool_size == 0 { - PoolState::Disabled + let inner = { + let pool = PoolInner::create( + shard.clone(), + pool_name.as_str(), + postgres_url, + pool_size, + fdw_pool_size, + logger, + registry, + state_tracker.clone(), + ); + if pool_name.is_replica() { + PoolState::ready(Arc::new(pool)) } else { - let pool = PoolInner::create( - shard.clone(), - pool_name.as_str(), - postgres_url, - pool_size, - fdw_pool_size, - logger, - registry, - state_tracker.clone(), - ); - if pool_name.is_replica() { - PoolState::Ready(Arc::new(pool)) - } else { - PoolState::Created(Arc::new(pool), coord) - } + PoolState::created(Arc::new(pool), coord) } }; ConnectionPool { - inner: Arc::new(TimedMutex::new(pool_state, format!("pool-{}", shard_name))), - logger: logger.clone(), + inner, shard, state_tracker, } @@ -457,11 +539,7 @@ impl ConnectionPool { /// This is only used for `graphman` to ensure it doesn't run migrations /// or other setup steps pub fn skip_setup(&self) { - let mut guard = self.inner.lock(&self.logger); - match &*guard { - PoolState::Created(pool, _) => *guard = PoolState::Ready(pool.clone()), - PoolState::Ready(_) | PoolState::Disabled => { /* nothing to do */ } - } + self.inner.set_ready(); } /// Return a pool that is ready, i.e., connected to the database. If the @@ -469,7 +547,6 @@ impl ConnectionPool { /// or the pool is marked as unavailable, return /// `StoreError::DatabaseUnavailable` fn get_ready(&self) -> Result, StoreError> { - let mut guard = self.inner.lock(&self.logger); if !self.state_tracker.is_available() { // We know that trying to use this pool is pointless since the // database is not available, and will only lead to other @@ -478,16 +555,12 @@ impl ConnectionPool { return Err(StoreError::DatabaseUnavailable); } - match &*guard { - PoolState::Created(pool, servers) => { - pool.setup(servers.clone())?; - let pool2 = pool.clone(); - *guard = PoolState::Ready(pool.clone()); + match self.inner.get_ready() { + Ok(pool) => { self.state_tracker.mark_available(); - Ok(pool2) + Ok(pool) } - PoolState::Ready(pool) => Ok(pool.clone()), - PoolState::Disabled => Err(StoreError::DatabaseDisabled), + Err(e) => Err(e), } } @@ -580,53 +653,18 @@ impl ConnectionPool { .ignore_timeout(|| inner.try_get_fdw(logger, timeout)) } - pub fn connection_detail(&self) -> Result { - let pool = self.get_ready()?; - ForeignServer::new(pool.shard.clone(), &pool.postgres_url).map_err(|e| e.into()) - } - - /// Check that we can connect to the database - pub fn check(&self) -> bool { - true - } - - /// Setup the database for this pool. This includes configuring foreign - /// data wrappers for cross-shard communication, and running any pending - /// schema migrations for this database. - /// - /// # Panics - /// - /// If any errors happen during the migration, the process panics - pub async fn setup(&self) { - let pool = self.clone(); - graph::spawn_blocking_allow_panic(move || { - pool.get_ready().ok(); - }) - .await - // propagate panics - .unwrap(); - } - - pub(crate) async fn query_permit(&self) -> Result { - let pool = match &*self.inner.lock(&self.logger) { - PoolState::Created(pool, _) | PoolState::Ready(pool) => pool.clone(), - PoolState::Disabled => { - return Err(StoreError::DatabaseDisabled); - } - }; + pub(crate) async fn query_permit(&self) -> QueryPermit { + let pool = self.inner.get_unready(); let start = Instant::now(); let permit = pool.query_permit().await; - Ok(QueryPermit { + QueryPermit { permit, wait: start.elapsed(), - }) + } } - pub(crate) fn wait_stats(&self) -> Result { - match &*self.inner.lock(&self.logger) { - PoolState::Created(pool, _) | PoolState::Ready(pool) => Ok(pool.wait_stats.clone()), - PoolState::Disabled => Err(StoreError::DatabaseDisabled), - } + pub(crate) fn wait_stats(&self) -> PoolWaitStats { + self.inner.get_unready().wait_stats.cheap_clone() } /// Mirror key tables from the primary into our own schema. We do this @@ -1027,20 +1065,6 @@ impl PoolInner { self.pool.get().map_err(|_| StoreError::DatabaseUnavailable) } - pub fn get_with_timeout_warning( - &self, - logger: &Logger, - ) -> Result>, StoreError> { - loop { - match self.pool.get_timeout(ENV_VARS.store.connection_timeout) { - Ok(conn) => return Ok(conn), - Err(e) => error!(logger, "Error checking out connection, retrying"; - "error" => brief_error_msg(&e), - ), - } - } - } - /// Get the pool for fdw connections. It is an error if none is configured fn fdw_pool( &self, @@ -1120,71 +1144,25 @@ impl PoolInner { .unwrap_or(false) } - /// Setup the database for this pool. This includes configuring foreign - /// data wrappers for cross-shard communication, and running any pending - /// schema migrations for this database. - /// - /// Returns `StoreError::DatabaseUnavailable` if we can't connect to the - /// database. Any other error causes a panic. - /// - /// # Panics - /// - /// If any errors happen during the migration, the process panics - fn setup(&self, coord: Arc) -> Result<(), StoreError> { - fn die(logger: &Logger, msg: &'static str, err: &dyn std::fmt::Display) -> ! { - crit!(logger, "{}", msg; "error" => format!("{:#}", err)); - panic!("{}: {}", msg, err); - } - - let pool = self.clone(); - let mut conn = self.get().map_err(|_| StoreError::DatabaseUnavailable)?; - - let start = Instant::now(); - - advisory_lock::lock_migration(&mut conn) - .unwrap_or_else(|err| die(&pool.logger, "failed to get migration lock", &err)); - // This code can cause a race in database setup: if pool A has had - // schema changes and pool B then tries to map tables from pool A, - // but does so before the concurrent thread running this code for - // pool B has at least finished `configure_fdw`, mapping tables will - // fail. In that case, the node must be restarted. The restart is - // guaranteed because this failure will lead to a panic in the setup - // for pool A - // - // This code can also leave the table mappings in a state where they - // have not been updated if the process is killed after migrating - // the schema but before finishing remapping in all shards. - // Addressing that would require keeping track of the need to remap - // in the database instead of just in memory - let result = pool - .configure_fdw(coord.servers.as_ref()) - .and_then(|()| pool.drop_cross_shard_views()) - .and_then(|()| migrate_schema(&pool.logger, &mut conn)); - debug!(&pool.logger, "Release migration lock"); - advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| { - die(&pool.logger, "failed to release migration lock", &err); - }); - let result = result - .and_then(|count| coord.propagate(&pool, count)) - .and_then(|()| pool.create_cross_shard_views(coord.servers.as_ref())); - result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err)); - - // Locale check - if let Err(msg) = catalog::Locale::load(&mut conn)?.suitable() { - if &self.shard == &*PRIMARY_SHARD && primary::is_empty(&mut conn)? { - die( - &pool.logger, + fn locale_check( + &self, + logger: &Logger, + mut conn: PooledConnection>, + ) -> Result<(), StoreError> { + Ok( + if let Err(msg) = catalog::Locale::load(&mut conn)?.suitable() { + if &self.shard == &*PRIMARY_SHARD && primary::is_empty(&mut conn)? { + const MSG: &str = "Database does not use C locale. \ - Please check the graph-node documentation for how to set up the database locale", - &msg, - ); - } else { - warn!(pool.logger, "{}.\nPlease check the graph-node documentation for how to set up the database locale", msg); - } - } + Please check the graph-node documentation for how to set up the database locale"; - debug!(&pool.logger, "Setup finished"; "setup_time_s" => start.elapsed().as_secs()); - Ok(()) + crit!(logger, "{}: {}", MSG, msg); + panic!("{}: {}", MSG, msg); + } else { + warn!(logger, "{}.\nPlease check the graph-node documentation for how to set up the database locale", msg); + } + }, + ) } pub(crate) async fn query_permit(&self) -> tokio::sync::OwnedSemaphorePermit { @@ -1214,6 +1192,28 @@ impl PoolInner { }) } + /// Do the part of database setup that only affects this pool. Those + /// steps are + /// 1. Configuring foreign servers and user mappings for talking to the + /// other shards + /// 2. Migrating the schema to the latest version + /// 3. Checking that the locale is set to C + async fn migrate( + self: Arc, + servers: &[ForeignServer], + ) -> Result { + self.configure_fdw(servers)?; + let mut conn = self.get()?; + let (this, count) = conn.transaction(|conn| -> Result<_, StoreError> { + let count = migrate_schema(&self.logger, conn)?; + Ok((self, count)) + })?; + + this.locale_check(&this.logger, conn)?; + + Ok(count) + } + /// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP` fn drop_cross_shard_views(&self) -> Result<(), StoreError> { if self.shard != *PRIMARY_SHARD { @@ -1257,14 +1257,17 @@ impl PoolInner { return Ok(()); } - info!(&self.logger, "Creating cross-shard views"); let mut conn = self.get()?; + let sharded = Namespace::special(ForeignServer::CROSS_SHARD_NSP); + if catalog::has_namespace(&mut conn, &sharded)? { + // We dropped the namespace before, but another node must have + // recreated it in the meantime so we don't need to do anything + return Ok(()); + } + info!(&self.logger, "Creating cross-shard views"); conn.transaction(|conn| { - let query = format!( - "create schema if not exists {}", - ForeignServer::CROSS_SHARD_NSP - ); + let query = format!("create schema {}", ForeignServer::CROSS_SHARD_NSP); conn.batch_execute(&query)?; for (src_nsp, src_tables) in SHARDED_TABLES { // Pairs of (shard, nsp) for all servers @@ -1379,11 +1382,6 @@ fn migrate_schema(logger: &Logger, conn: &mut PgConnection) -> Result Result>>, + logger: Logger, + pools: Mutex>, servers: Arc>, } impl PoolCoordinator { - pub fn new(servers: Arc>) -> Self { + pub fn new(logger: &Logger, servers: Arc>) -> Self { + let logger = logger.new(o!("component" => "ConnectionPool", "component" => "Coordinator")); Self { + logger, pools: Mutex::new(HashMap::new()), servers, } @@ -1411,7 +1412,7 @@ impl PoolCoordinator { self: Arc, logger: &Logger, name: &str, - pool_name: PoolName, + pool_name: PoolRole, postgres_url: String, pool_size: u32, fdw_pool_size: Option, @@ -1433,20 +1434,12 @@ impl PoolCoordinator { // Ignore non-writable pools (replicas), there is no need (and no // way) to coordinate schema changes with them if is_writable { - // It is safe to take this lock here since nobody has seen the pool - // yet. We remember the `PoolInner` so that later, when we have to - // call `remap()`, we do not have to take this lock as that will be - // already held in `get_ready()` - match &*pool.inner.lock(logger) { - PoolState::Created(inner, _) | PoolState::Ready(inner) => { - self.pools - .lock() - .unwrap() - .insert(pool.shard.clone(), inner.clone()); - } - PoolState::Disabled => { /* nothing to do */ } - } + self.pools + .lock() + .unwrap() + .insert(pool.shard.clone(), pool.inner.cheap_clone()); } + pool } @@ -1478,13 +1471,8 @@ impl PoolCoordinator { if count.had_migrations() { let server = self.server(&pool.shard)?; for pool in self.pools.lock().unwrap().values() { - let mut conn = pool.get()?; - let remap_res = { - advisory_lock::lock_migration(&mut conn)?; - let res = pool.remap(server); - advisory_lock::unlock_migration(&mut conn)?; - res - }; + let pool = pool.get_unready(); + let remap_res = pool.remap(server); if let Err(e) = remap_res { error!(pool.logger, "Failed to map imports from {}", server.shard; "error" => e.to_string()); return Err(e); @@ -1494,8 +1482,15 @@ impl PoolCoordinator { Ok(()) } + /// Return a list of all pools, regardless of whether they are ready or + /// not. pub fn pools(&self) -> Vec> { - self.pools.lock().unwrap().values().cloned().collect() + self.pools + .lock() + .unwrap() + .values() + .map(|state| state.get_unready()) + .collect::>() } pub fn servers(&self) -> Arc> { @@ -1508,4 +1503,188 @@ impl PoolCoordinator { .find(|server| &server.shard == shard) .ok_or_else(|| constraint_violation!("unknown shard {shard}")) } + + fn primary(&self) -> Result, StoreError> { + let map = self.pools.lock().unwrap(); + let pool_state = map.get(&*&PRIMARY_SHARD).ok_or_else(|| { + constraint_violation!("internal error: primary shard not found in pool coordinator") + })?; + + Ok(pool_state.get_unready()) + } + + /// Setup all pools the coordinator knows about and return the number of + /// pools that were successfully set up. + /// + /// # Panics + /// + /// If any errors besides a database not being available happen during + /// the migration, the process panics + pub async fn setup_all(&self, logger: &Logger) -> usize { + let pools = self + .pools + .lock() + .unwrap() + .values() + .cloned() + .collect::>(); + + let res = self.setup(pools).await; + + match res { + Ok(count) => { + info!(logger, "Setup finished"; "shards" => count); + count + } + Err(e) => { + crit!(logger, "database setup failed"; "error" => format!("{e}")); + panic!("database setup failed: {}", e); + } + } + } + + /// A helper to call `setup` from a non-async context. Returns `true` if + /// the setup was actually run, i.e. if `pool` was available + fn setup_bg(self: Arc, pool: PoolState) -> Result { + let migrated = graph::spawn_thread("database-setup", move || { + graph::block_on(self.setup(vec![pool.clone()])) + }) + .join() + // unwrap: propagate panics + .unwrap()?; + Ok(migrated == 1) + } + + /// Setup all pools by doing the following steps: + /// 1. Get the migration lock in the primary. This makes sure that only + /// one node runs migrations + /// 2. Remove the views in `sharded` as they might interfere with + /// running migrations + /// 3. In parallel, do the following in each pool: + /// 1. Configure fdw servers + /// 2. Run migrations in all pools in parallel + /// 4. In parallel, do the following in each pool: + /// 1. Create/update the mappings in `shard__subgraphs` and in + /// `primary_public` + /// 5. Create the views in `sharded` again + /// 6. Release the migration lock + /// + /// This method tolerates databases that are not available and will + /// simply ignore them. The returned count is the number of pools that + /// were successfully set up. + /// + /// When this method returns, the entries from `states` that were + /// successfully set up will be marked as ready. The method returns the + /// number of pools that were set up + async fn setup(&self, states: Vec) -> Result { + type MigrationCounts = Vec<(PoolState, MigrationCount)>; + + /// Filter out pools that are not available. We don't want to fail + /// because one of the pools is not available. We will just ignore + /// them and continue with the others. + fn filter_unavailable( + (state, res): (PoolState, Result), + ) -> Option> { + if let Err(StoreError::DatabaseUnavailable) = res { + error!( + state.logger, + "migrations failed because database was unavailable" + ); + None + } else { + Some(res.map(|count| (state, count))) + } + } + + /// Migrate all pools in parallel + async fn migrate( + pools: &[PoolState], + servers: &[ForeignServer], + ) -> Result { + let futures = pools + .iter() + .map(|state| { + state + .get_unready() + .cheap_clone() + .migrate(servers) + .map(|res| (state.cheap_clone(), res)) + }) + .collect::>(); + join_all(futures) + .await + .into_iter() + .filter_map(filter_unavailable) + .collect::, _>>() + } + + /// Propagate the schema changes to all other pools in parallel + async fn propagate( + this: &PoolCoordinator, + migrated: MigrationCounts, + ) -> Result, StoreError> { + let futures = migrated + .into_iter() + .map(|(state, count)| async move { + let pool = state.get_unready(); + let res = this.propagate(&pool, count); + (state.cheap_clone(), res) + }) + .collect::>(); + join_all(futures) + .await + .into_iter() + .filter_map(filter_unavailable) + .map(|res| res.map(|(state, ())| state)) + .collect::, _>>() + } + + let primary = self.primary()?; + + let mut pconn = primary.get().map_err(|_| StoreError::DatabaseUnavailable)?; + + let states: Vec<_> = states + .into_iter() + .filter(|pool| pool.needs_setup()) + .collect(); + if states.is_empty() { + return Ok(0); + } + + // Everything here happens under the migration lock. Anything called + // from here should not try to get that lock, otherwise the process + // will deadlock + debug!(self.logger, "Waiting for migration lock"); + let res = with_migration_lock(&mut pconn, |_| async { + debug!(self.logger, "Migration lock acquired"); + + // While we were waiting for the migration lock, another thread + // might have already run this + let states: Vec<_> = states + .into_iter() + .filter(|pool| pool.needs_setup()) + .collect(); + if states.is_empty() { + debug!(self.logger, "No pools to set up"); + return Ok(0); + } + + primary.drop_cross_shard_views()?; + + let migrated = migrate(&states, self.servers.as_ref()).await?; + + let propagated = propagate(&self, migrated).await?; + + primary.create_cross_shard_views(&self.servers)?; + + for state in &propagated { + state.set_ready(); + } + Ok(propagated.len()) + }) + .await; + debug!(self.logger, "Database setup finished"); + + res + } } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 96dd5507f3e..91230d63b7b 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -415,7 +415,7 @@ impl DeploymentStore { Ok(conn) } - pub(crate) async fn query_permit(&self, replica: ReplicaId) -> Result { + pub(crate) async fn query_permit(&self, replica: ReplicaId) -> QueryPermit { let pool = match replica { ReplicaId::Main => &self.pool, ReplicaId::ReadOnly(idx) => &self.read_only_pools[idx], @@ -423,7 +423,7 @@ impl DeploymentStore { pool.query_permit().await } - pub(crate) fn wait_stats(&self, replica: ReplicaId) -> Result { + pub(crate) fn wait_stats(&self, replica: ReplicaId) -> PoolWaitStats { match replica { ReplicaId::Main => self.pool.wait_stats(), ReplicaId::ReadOnly(idx) => self.read_only_pools[idx].wait_stats(), diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 5ec81dcbd61..f329ae4bba2 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -266,6 +266,13 @@ impl Namespace { Namespace(format!("prune{id}")) } + /// A namespace that is not a deployment namespace. This is used for + /// special namespaces we use. No checking is done on `s` and the caller + /// must ensure it's a valid namespace name + pub fn special(s: impl Into) -> Self { + Namespace(s.into()) + } + pub fn as_str(&self) -> &str { &self.0 } diff --git a/store/postgres/src/query_store.rs b/store/postgres/src/query_store.rs index 8fc2da822e4..fe7d084030b 100644 --- a/store/postgres/src/query_store.rs +++ b/store/postgres/src/query_store.rs @@ -112,7 +112,7 @@ impl QueryStoreTrait for QueryStore { self.chain_store.block_numbers(block_hashes).await } - fn wait_stats(&self) -> Result { + fn wait_stats(&self) -> PoolWaitStats { self.store.wait_stats(self.replica_id) } @@ -137,7 +137,7 @@ impl QueryStoreTrait for QueryStore { &self.site.network } - async fn query_permit(&self) -> Result { + async fn query_permit(&self) -> QueryPermit { self.store.query_permit(self.replica_id).await } diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index 50a5e4b21e0..7eb428a5058 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -167,8 +167,8 @@ impl StatusStore for Store { .await } - async fn query_permit(&self) -> Result { + async fn query_permit(&self) -> QueryPermit { // Status queries go to the primary shard. - Ok(self.block_store.query_permit_primary().await) + self.block_store.query_permit_primary().await } } diff --git a/store/test-store/tests/graphql/introspection.rs b/store/test-store/tests/graphql/introspection.rs index 6139e673767..8bc76213e6b 100644 --- a/store/test-store/tests/graphql/introspection.rs +++ b/store/test-store/tests/graphql/introspection.rs @@ -53,15 +53,15 @@ impl Resolver for MockResolver { Ok(r::Value::Null) } - async fn query_permit(&self) -> Result { + async fn query_permit(&self) -> QueryPermit { let permit = Arc::new(tokio::sync::Semaphore::new(1)) .acquire_owned() .await .unwrap(); - Ok(QueryPermit { + QueryPermit { permit, wait: Duration::from_secs(0), - }) + } } } diff --git a/tests/src/config.rs b/tests/src/config.rs index 09e3b55fa47..54b07b0a5a8 100644 --- a/tests/src/config.rs +++ b/tests/src/config.rs @@ -210,7 +210,6 @@ impl Config { let setup = format!( r#" create extension pg_trgm; - create extension pg_stat_statements; create extension btree_gist; create extension postgres_fdw; grant usage on foreign data wrapper postgres_fdw to "{}";