Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: refactor max connection handling #30624

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use mz_sql::plan::{Plan, PlanNotice, StatementDesc};
use mz_sql::rbac;
use mz_sql::session::metadata::SessionMetadata;
use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SUPPORT_USER, SYSTEM_USER};
use mz_sql::session::vars::{ConnectionCounter, SystemVars};
use mz_sql::session::vars::SystemVars;
use mz_sql_parser::ast::QualifiedReplica;
use mz_storage_types::connections::inline::{ConnectionResolver, InlinedConnection};
use mz_storage_types::connections::ConnectionContext;
Expand Down Expand Up @@ -627,7 +627,6 @@ impl Catalog {
enable_expression_cache_override: Option<bool>,
) -> Result<Catalog, anyhow::Error> {
let metrics_registry = &MetricsRegistry::new();
let active_connection_count = Arc::new(std::sync::Mutex::new(ConnectionCounter::new(0, 0)));
let secrets_reader = Arc::new(InMemorySecretsController::new());
// Used as a lower boundary of the boot_ts, but it's ok to use now() for
// debugging/testing.
Expand Down Expand Up @@ -667,7 +666,6 @@ impl Catalog {
aws_privatelink_availability_zones: None,
http_host_name: None,
connection_context: ConnectionContext::for_tests(secrets_reader),
active_connection_count,
builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy,
persist_client,
enable_expression_cache_override,
Expand Down Expand Up @@ -1278,6 +1276,10 @@ impl Catalog {
self.state.system_config()
}

pub fn system_config_mut(&mut self) -> &mut SystemVars {
self.state.system_config_mut()
}

pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
self.state.ensure_not_reserved_role(role_id)
}
Expand Down
14 changes: 6 additions & 8 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ impl Catalog {
);
}

let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
if config.all_features {
system_configuration.enable_all_feature_flags_by_default();
}

let mut state = CatalogState {
database_by_name: BTreeMap::new(),
database_by_id: BTreeMap::new(),
Expand All @@ -247,14 +252,7 @@ impl Catalog {
roles_by_id: BTreeMap::new(),
network_policies_by_id: BTreeMap::new(),
network_policies_by_name: BTreeMap::new(),
system_configuration: {
let mut s =
SystemVars::new(config.active_connection_count).set_unsafe(config.unsafe_mode);
if config.all_features {
s.enable_all_feature_flags_by_default();
}
s
},
system_configuration,
default_privileges: DefaultPrivileges::default(),
system_privileges: PrivilegeMap::default(),
comments: CommentsMap::default(),
Expand Down
5 changes: 5 additions & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2079,6 +2079,11 @@ impl CatalogState {
&self.system_configuration
}

/// Return a mutable reference to the current system configuration.
pub fn system_config_mut(&mut self) -> &mut SystemVars {
&mut self.system_configuration
}

/// Serializes the catalog's in-memory state.
///
/// There are no guarantees about the format of the serialized state, except
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/config/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ impl SystemParameterBackend {
pub async fn push(&mut self, params: &mut SynchronizedParameters) {
for param in params.modified() {
let mut vars = BTreeMap::new();
info!(name = param.name, value = param.value, "updating parameter");
vars.insert(param.name.clone(), param.value.clone());
match self.session_client.set_system_vars(vars).await {
Ok(()) => {
info!(name = param.name, value = param.value, "sync parameter");
info!(name = param.name, value = param.value, "update success");
}
Err(error) => match error {
AdapterError::ReadOnly => {
Expand Down
41 changes: 36 additions & 5 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ use mz_controller::ControllerConfig;
use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
use mz_orchestrator::{OfflineReason, ServiceProcessMetrics};
use mz_ore::cast::{CastFrom, CastLossy};
use mz_ore::cast::{CastFrom, CastInto, CastLossy};
use mz_ore::channel::trigger::Trigger;
use mz_ore::future::TimeoutError;
use mz_ore::metrics::MetricsRegistry;
Expand Down Expand Up @@ -141,7 +141,7 @@ use mz_sql::plan::{
OnTimeoutAction, Params, QueryWhen,
};
use mz_sql::session::user::User;
use mz_sql::session::vars::{ConnectionCounter, SystemVars};
use mz_sql::session::vars::SystemVars;
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::ExplainStage;
use mz_storage_client::client::TimestamplessUpdate;
Expand Down Expand Up @@ -1003,7 +1003,7 @@ pub struct Config {
pub aws_account_id: Option<String>,
pub aws_privatelink_availability_zones: Option<Vec<String>>,
pub connection_context: ConnectionContext,
pub active_connection_count: Arc<Mutex<ConnectionCounter>>,
pub connection_limit_callback: Box<dyn Fn(u64, u64) -> () + Send + Sync + 'static>,
pub webhook_concurrency_limit: WebhookConcurrencyLimiter,
pub http_host_name: Option<String>,
pub tracing_handle: TracingHandle,
Expand Down Expand Up @@ -3706,8 +3706,8 @@ pub fn serve(
aws_account_id,
aws_privatelink_availability_zones,
connection_context,
connection_limit_callback,
remote_system_parameters,
active_connection_count,
webhook_concurrency_limit,
http_host_name,
tracing_handle,
Expand Down Expand Up @@ -3854,7 +3854,6 @@ pub fn serve(
aws_principal_context,
aws_privatelink_availability_zones,
connection_context,
active_connection_count,
http_host_name,
builtin_item_migration_config,
persist_client: persist_client.clone(),
Expand Down Expand Up @@ -3933,6 +3932,38 @@ pub fn serve(
pg_timestamp_oracle_params.apply(config);
}

// Register a callback so whenever the MAX_CONNECTIONS or SUPERUSER_RESERVED_CONNECTIONS
// system variables change, we update our connection limits.
let connection_limit_callback: Arc<dyn Fn(&SystemVars) + Send + Sync> =
Arc::new(move |system_vars: &SystemVars| {
let limit: u64 = system_vars.max_connections().cast_into();
let superuser_reserved: u64 =
system_vars.superuser_reserved_connections().cast_into();

// If superuser_reserved > max_connections, prefer max_connections.
//
// In this scenario all normal users would be locked out because all connections
// would be reserved for superusers so complain if this is the case.
let superuser_reserved = if superuser_reserved >= limit {
tracing::warn!(
"superuser_reserved ({superuser_reserved}) is greater than max connections ({limit})!"
);
limit
} else {
superuser_reserved
};

(connection_limit_callback)(limit, superuser_reserved);
});
catalog.system_config_mut().register_callback(
&mz_sql::session::vars::MAX_CONNECTIONS,
Arc::clone(&connection_limit_callback),
);
catalog.system_config_mut().register_callback(
&mz_sql::session::vars::SUPERUSER_RESERVED_CONNECTIONS,
connection_limit_callback,
);

let parent_span = tracing::Span::current();
let thread = thread::Builder::new()
// The Coordinator thread tends to keep a lot of data on its stack. To
Expand Down
6 changes: 3 additions & 3 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,10 +926,10 @@ impl From<mz_sql_parser::ast::IdentError> for AdapterError {
}
}

impl From<mz_sql::session::vars::ConnectionError> for AdapterError {
fn from(value: mz_sql::session::vars::ConnectionError) -> Self {
impl From<mz_pgwire_common::ConnectionError> for AdapterError {
fn from(value: mz_pgwire_common::ConnectionError) -> Self {
match value {
mz_sql::session::vars::ConnectionError::TooManyConnections { current, limit } => {
mz_pgwire_common::ConnectionError::TooManyConnections { current, limit } => {
AdapterError::ResourceExhaustion {
resource_type: "connection".into(),
limit_name: "max_connections".into(),
Expand Down
4 changes: 1 addition & 3 deletions src/catalog-debug/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use std::fs::File;
use std::io::{self, Write};
use std::path::PathBuf;
use std::process;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::{Arc, Mutex};
use std::time::Instant;

use anyhow::Context;
Expand Down Expand Up @@ -53,7 +53,6 @@ use mz_persist_client::{PersistClient, PersistLocation};
use mz_repr::{Diff, Timestamp};
use mz_service::secrets::SecretsReaderCliArgs;
use mz_sql::catalog::EnvironmentId;
use mz_sql::session::vars::ConnectionCounter;
use mz_storage_types::connections::ConnectionContext;
use serde::{Deserialize, Serialize};
use tracing::{error, Instrument};
Expand Down Expand Up @@ -608,7 +607,6 @@ async fn upgrade_check(
secrets_reader,
None,
),
active_connection_count: Arc::new(Mutex::new(ConnectionCounter::new(0, 0))),
builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy,
persist_client,
enable_expression_cache_override: None,
Expand Down
4 changes: 0 additions & 4 deletions src/catalog/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;

use bytesize::ByteSize;
use ipnet::IpNet;
Expand All @@ -22,7 +21,6 @@ use mz_persist_client::PersistClient;
use mz_repr::CatalogItemId;
use mz_sql::catalog::CatalogError as SqlCatalogError;
use mz_sql::catalog::EnvironmentId;
use mz_sql::session::vars::ConnectionCounter;
use serde::{Deserialize, Serialize};

use crate::durable::{CatalogError, DurableCatalogState};
Expand Down Expand Up @@ -84,8 +82,6 @@ pub struct StateConfig {
pub http_host_name: Option<String>,
/// Context for source and sink connections.
pub connection_context: mz_storage_types::connections::ConnectionContext,
/// Global connection limit and count
pub active_connection_count: Arc<std::sync::Mutex<ConnectionCounter>>,
pub builtin_item_migration_config: BuiltinItemMigrationConfig,
pub persist_client: PersistClient,
/// Overrides the current value of the [`mz_adapter_types::dyncfgs::ENABLE_EXPRESSION_CACHE`]
Expand Down
Loading