Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store startup fixes #5926

Merged
merged 13 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions graph/src/components/metrics/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::{Arc, RwLock};

use prometheus::IntGauge;
use prometheus::{labels, Histogram, IntCounterVec};
use slog::info;
use slog::debug;

use crate::components::metrics::{counter_with_labels, gauge_with_labels};
use crate::prelude::Collector;
Expand Down Expand Up @@ -133,7 +133,7 @@ impl MetricsRegistry {
let mut result = self.registry.register(collector.clone());

if matches!(result, Err(PrometheusError::AlreadyReg)) {
info!(logger, "Resolving duplicate metric registration");
debug!(logger, "Resolving duplicate metric registration");

// Since the current metric is a duplicate,
// we can use it to unregister the previous registration.
Expand All @@ -144,7 +144,6 @@ impl MetricsRegistry {

match result {
Ok(()) => {
info!(logger, "Successfully registered a new metric");
self.registered_metrics.inc();
}
Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion graph/src/components/store/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl Clone for StoreError {
}

impl StoreError {
fn from_diesel_error(e: &DieselError) -> Option<Self> {
pub fn from_diesel_error(e: &DieselError) -> Option<Self> {
const CONN_CLOSE: &str = "server closed the connection unexpectedly";
const STMT_TIMEOUT: &str = "canceling statement due to statement timeout";
let DieselError::DatabaseError(_, info) = e else {
Expand Down
6 changes: 3 additions & 3 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ pub trait QueryStore: Send + Sync {
block_hash: &BlockHash,
) -> Result<Option<(BlockNumber, Option<u64>, Option<BlockHash>)>, StoreError>;

fn wait_stats(&self) -> Result<PoolWaitStats, StoreError>;
fn wait_stats(&self) -> PoolWaitStats;

/// Find the current state for the subgraph deployment `id` and
/// return details about it needed for executing queries
Expand All @@ -668,7 +668,7 @@ pub trait QueryStore: Send + Sync {
fn network_name(&self) -> &str;

/// A permit should be acquired before starting query execution.
async fn query_permit(&self) -> Result<QueryPermit, StoreError>;
async fn query_permit(&self) -> QueryPermit;

/// Report the name of the shard in which the subgraph is stored. This
/// should only be used for reporting and monitoring
Expand All @@ -683,7 +683,7 @@ pub trait QueryStore: Send + Sync {
#[async_trait]
pub trait StatusStore: Send + Sync + 'static {
/// A permit should be acquired before starting query execution.
async fn query_permit(&self) -> Result<QueryPermit, StoreError>;
async fn query_permit(&self) -> QueryPermit;

fn status(&self, filter: status::Filter) -> Result<Vec<status::Info>, StoreError>;

Expand Down
7 changes: 2 additions & 5 deletions graph/src/data/query/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,8 @@ impl Trace {
}
}

pub fn query_done(&mut self, dur: Duration, permit: &Result<QueryPermit, QueryExecutionError>) {
let permit_dur = match permit {
Ok(permit) => permit.wait,
Err(_) => Duration::from_millis(0),
};
pub fn query_done(&mut self, dur: Duration, permit: &QueryPermit) {
let permit_dur = permit.wait;
match self {
Trace::None => { /* nothing to do */ }
Trace::Root { .. } => {
Expand Down
9 changes: 5 additions & 4 deletions graph/src/task_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ pub fn block_on<T>(f: impl Future03<Output = T>) -> T {
}

/// Spawns a thread with access to the tokio runtime. Panics if the thread cannot be spawned.
pub fn spawn_thread(
name: impl Into<String>,
f: impl 'static + FnOnce() + Send,
) -> std::thread::JoinHandle<()> {
pub fn spawn_thread<F, R>(name: impl Into<String>, f: F) -> std::thread::JoinHandle<R>
where
F: 'static + FnOnce() -> R + Send,
R: 'static + Send,
{
let conf = std::thread::Builder::new().name(name.into());
let runtime = tokio::runtime::Handle::current();
conf.spawn(move || {
Expand Down
2 changes: 1 addition & 1 deletion graphql/src/execution/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::Query;
pub trait Resolver: Sized + Send + Sync + 'static {
const CACHEABLE: bool;

async fn query_permit(&self) -> Result<QueryPermit, QueryExecutionError>;
async fn query_permit(&self) -> QueryPermit;

/// Prepare for executing a query by prefetching as much data as possible
fn prefetch(
Expand Down
2 changes: 1 addition & 1 deletion graphql/src/introspection/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl Resolver for IntrospectionResolver {
// see `fn as_introspection_context`, so this value is irrelevant.
const CACHEABLE: bool = false;

async fn query_permit(&self) -> Result<QueryPermit, QueryExecutionError> {
async fn query_permit(&self) -> QueryPermit {
unreachable!()
}

Expand Down
2 changes: 1 addition & 1 deletion graphql/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ where
)?;
self.load_manager
.decide(
&store.wait_stats().map_err(QueryExecutionError::from)?,
&store.wait_stats(),
store.shard(),
store.deployment_id(),
query.shape_hash,
Expand Down
4 changes: 2 additions & 2 deletions graphql/src/store/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ impl StoreResolver {
impl Resolver for StoreResolver {
const CACHEABLE: bool = true;

async fn query_permit(&self) -> Result<QueryPermit, QueryExecutionError> {
self.store.query_permit().await.map_err(Into::into)
async fn query_permit(&self) -> QueryPermit {
self.store.query_permit().await
}

fn prefetch(
Expand Down
2 changes: 1 addition & 1 deletion node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ impl Context {

fn primary_pool(self) -> ConnectionPool {
let primary = self.config.primary_store();
let coord = Arc::new(PoolCoordinator::new(Arc::new(vec![])));
let coord = Arc::new(PoolCoordinator::new(&self.logger, Arc::new(vec![])));
let pool = StoreBuilder::main_pool(
&self.logger,
&self.node_id,
Expand Down
35 changes: 25 additions & 10 deletions node/src/store_builder.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::iter::FromIterator;
use std::{collections::HashMap, sync::Arc};

use graph::futures03::future::join_all;
use graph::prelude::{o, MetricsRegistry, NodeId};
use graph::slog::warn;
use graph::url::Url;
use graph::{
prelude::{info, CheapClone, Logger},
util::security::SafeDisplay,
};
use graph_store_postgres::connection_pool::{
ConnectionPool, ForeignServer, PoolCoordinator, PoolName,
ConnectionPool, ForeignServer, PoolCoordinator, PoolRole,
};
use graph_store_postgres::{
BlockStore as DieselBlockStore, ChainHeadUpdateListener as PostgresChainHeadUpdateListener,
Expand Down Expand Up @@ -62,7 +62,7 @@ impl StoreBuilder {
// attempt doesn't work for all of them because the database is
// unavailable, they will try again later in the normal course of
// using the pool
join_all(pools.values().map(|pool| pool.setup())).await;
coord.setup_all(logger).await;

let chains = HashMap::from_iter(config.chains.chains.iter().map(|(name, chain)| {
let shard = ShardName::new(chain.shard.to_string())
Expand Down Expand Up @@ -111,13 +111,28 @@ impl StoreBuilder {
.collect::<Result<Vec<_>, _>>()
.expect("connection url's contain enough detail");
let servers = Arc::new(servers);
let coord = Arc::new(PoolCoordinator::new(servers));
let coord = Arc::new(PoolCoordinator::new(logger, servers));

let shards: Vec<_> = config
.stores
.iter()
.map(|(name, shard)| {
.filter_map(|(name, shard)| {
let logger = logger.new(o!("shard" => name.to_string()));
let pool_size = shard.pool_size.size_for(node, name).unwrap_or_else(|_| {
panic!("cannot determine the pool size for store {}", name)
});
if pool_size == 0 {
if name == PRIMARY_SHARD.as_str() {
panic!("pool size for primary shard must be greater than 0");
} else {
warn!(
logger,
"pool size for shard {} is 0, ignoring this shard", name
);
return None;
}
}

let conn_pool = Self::main_pool(
&logger,
node,
Expand All @@ -138,7 +153,7 @@ impl StoreBuilder {

let name =
ShardName::new(name.to_string()).expect("shard names have been validated");
(name, conn_pool, read_only_conn_pools, weights)
Some((name, conn_pool, read_only_conn_pools, weights))
})
.collect();

Expand Down Expand Up @@ -196,8 +211,8 @@ impl StoreBuilder {
Arc::new(DieselStore::new(subgraph_store, block_store))
}

/// Create a connection pool for the main database of the primary shard
/// without connecting to all the other configured databases
/// Create a connection pool for the main (non-replica) database of a
/// shard
pub fn main_pool(
logger: &Logger,
node: &NodeId,
Expand Down Expand Up @@ -225,7 +240,7 @@ impl StoreBuilder {
coord.create_pool(
&logger,
name,
PoolName::Main,
PoolRole::Main,
shard.connection.clone(),
pool_size,
Some(fdw_pool_size),
Expand Down Expand Up @@ -265,7 +280,7 @@ impl StoreBuilder {
coord.clone().create_pool(
&logger,
name,
PoolName::Replica(pool),
PoolRole::Replica(pool),
replica.connection.clone(),
pool_size,
None,
Expand Down
4 changes: 2 additions & 2 deletions server/index-node/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,8 @@ fn entity_changes_to_graphql(entity_changes: Vec<EntityOperation>) -> r::Value {
impl<S: Store> Resolver for IndexNodeResolver<S> {
const CACHEABLE: bool = false;

async fn query_permit(&self) -> Result<QueryPermit, QueryExecutionError> {
self.store.query_permit().await.map_err(Into::into)
async fn query_permit(&self) -> QueryPermit {
self.store.query_permit().await
}

fn prefetch(
Expand Down
34 changes: 24 additions & 10 deletions store/postgres/src/advisory_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! has more details on advisory locks.
//!
//! We use the following 64 bit locks:
//! * 1,2: to synchronize on migratons
//! * 1: to synchronize on migratons
//!
//! We use the following 2x 32-bit locks
//! * 1, n: to lock copying of the deployment with id n in the destination
Expand Down Expand Up @@ -69,17 +69,31 @@ const COPY: Scope = Scope { id: 1 };
const WRITE: Scope = Scope { id: 2 };
const PRUNE: Scope = Scope { id: 3 };

/// Get a lock for running migrations. Blocks until we get the lock.
pub(crate) fn lock_migration(conn: &mut PgConnection) -> Result<(), StoreError> {
sql_query("select pg_advisory_lock(1)").execute(conn)?;
/// Block until we can get the migration lock, then run `f` and unlock when
/// it is done. This is used to make sure that only one node runs setup at a
/// time.
pub(crate) async fn with_migration_lock<F, Fut, R>(
conn: &mut PgConnection,
f: F,
) -> Result<R, StoreError>
where
F: FnOnce(&mut PgConnection) -> Fut,
Fut: std::future::Future<Output = Result<R, StoreError>>,
{
fn execute(conn: &mut PgConnection, query: &str, msg: &str) -> Result<(), StoreError> {
sql_query(query).execute(conn).map(|_| ()).map_err(|e| {
StoreError::from_diesel_error(&e)
.unwrap_or_else(|| StoreError::Unknown(anyhow::anyhow!("{}: {}", msg, e)))
})
}

Ok(())
}
const LOCK: &str = "select pg_advisory_lock(1)";
const UNLOCK: &str = "select pg_advisory_unlock(1)";

/// Release the migration lock.
pub(crate) fn unlock_migration(conn: &mut PgConnection) -> Result<(), StoreError> {
sql_query("select pg_advisory_unlock(1)").execute(conn)?;
Ok(())
execute(conn, LOCK, "failed to acquire migration lock")?;
let res = f(conn).await;
execute(conn, UNLOCK, "failed to release migration lock")?;
res
}

/// Take the lock used to keep two copy operations to run simultaneously on
Expand Down
6 changes: 1 addition & 5 deletions store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,7 @@ impl BlockStore {
}

pub(crate) async fn query_permit_primary(&self) -> QueryPermit {
self.mirror
.primary()
.query_permit()
.await
.expect("the primary is never disabled")
self.mirror.primary().query_permit().await
}

pub fn allocate_chain(
Expand Down
Loading
Loading