diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 354528e2cde5..6df2bb41bc75 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -855,6 +855,14 @@ impl ComputeNode { info!("Storage auth token not set"); } + config.application_name("compute_ctl"); + if let Some(spec) = &compute_state.pspec { + config.options(&format!( + "-c neon.compute_mode={}", + spec.spec.mode.to_type_str() + )); + } + // Connect to pageserver let mut client = config.connect(NoTls)?; let pageserver_connect_micros = start_time.elapsed().as_micros() as u64; diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 0760568ff822..ea7797a548f1 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -99,6 +99,7 @@ pub fn write_postgres_conf( writeln!(file, "lc_numeric='C.UTF-8'")?; } + writeln!(file, "neon.compute_mode={}", spec.mode.to_type_str())?; match spec.mode { ComputeMode::Primary => {} ComputeMode::Static(lsn) => { diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 77f2e1e631aa..2b166d0f113c 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -272,6 +272,18 @@ pub enum ComputeMode { Replica, } +impl ComputeMode { + /// Convert the compute mode to a string that can be used to identify the type of compute, + /// which means that if it's a static compute, the LSN will not be included. + pub fn to_type_str(&self) -> &'static str { + match self { + ComputeMode::Primary => "primary", + ComputeMode::Static(_) => "static", + ComputeMode::Replica => "replica", + } + } +} + /// Log level for audit logging /// Disabled, log, hipaa /// Default is Disabled diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f2d2ab05ad96..1268de24258e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -237,7 +237,7 @@ pub async fn libpq_listener_main( type ConnectionHandlerResult = anyhow::Result<()>; -#[instrument(skip_all, fields(peer_addr, application_name))] +#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))] #[allow(clippy::too_many_arguments)] async fn page_service_conn_main( conf: &'static PageServerConf, @@ -2505,6 +2505,58 @@ impl PageServiceCmd { } } +/// Parse the startup options from the postgres wire protocol startup packet. +/// +/// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string +/// by best effort and returns all the options parsed (key-value pairs) and a bool indicating +/// whether all options are successfully parsed. There could be duplicates in the options +/// if the caller passed such parameters. +fn parse_options(options: &str) -> (Vec<(String, String)>, bool) { + let mut parsing_config = false; + let mut has_error = false; + let mut config = Vec::new(); + for item in options.split_whitespace() { + if item == "-c" { + if !parsing_config { + parsing_config = true; + } else { + // "-c" followed with another "-c" + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + break; + } + } else if item.starts_with("-c") || parsing_config { + let Some((mut key, value)) = item.split_once('=') else { + // "-c" followed with an invalid option + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + break; + }; + if !parsing_config { + // Parse "-coptions=X" + let Some(stripped_key) = key.strip_prefix("-c") else { + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + break; + }; + key = stripped_key; + } + config.push((key.to_string(), value.to_string())); + parsing_config = false; + } else { + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + break; + } + } + if parsing_config { + // "-c" without the option + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + } + (config, has_error) +} + impl postgres_backend::Handler for PageServerHandler where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, @@ -2549,6 +2601,14 @@ where if let Some(app_name) = params.get("application_name") { Span::current().record("application_name", field::display(app_name)); } + if let Some(options) = params.get("options") { + let (config, _) = parse_options(options); + for (key, value) in config { + if key == "neon.compute_mode" { + Span::current().record("compute_mode", field::display(value)); + } + } + } }; Ok(()) @@ -2662,6 +2722,7 @@ where PageServiceCmd::Set => { // important because psycopg2 executes "SET datestyle TO 'ISO'" // on connect + // TODO: allow setting options, i.e., application_name/compute_mode via SET commands pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } PageServiceCmd::LeaseLsn(LeaseLsnCmd { @@ -2936,4 +2997,46 @@ mod tests { let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE")); assert!(cmd.is_err()); } + + #[test] + fn test_parse_options() { + let (config, has_error) = parse_options(" -c neon.compute_mode=primary "); + assert!(!has_error); + assert_eq!( + config, + vec![("neon.compute_mode".to_string(), "primary".to_string())] + ); + + let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar "); + assert!(!has_error); + assert_eq!( + config, + vec![ + ("neon.compute_mode".to_string(), "primary".to_string()), + ("foo".to_string(), "bar".to_string()), + ] + ); + + let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar"); + assert!(!has_error); + assert_eq!( + config, + vec![ + ("neon.compute_mode".to_string(), "primary".to_string()), + ("foo".to_string(), "bar".to_string()), + ] + ); + + let (_, has_error) = parse_options("-c"); + assert!(has_error); + + let (_, has_error) = parse_options("-c foo=bar -c -c"); + assert!(has_error); + + let (_, has_error) = parse_options(" "); + assert!(!has_error); + + let (_, has_error) = parse_options(" -c neon.compute_mode"); + assert!(has_error); + } } diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 637281fe4a7a..a71de34702d1 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -50,6 +50,20 @@ #define MIN_RECONNECT_INTERVAL_USEC 1000 #define MAX_RECONNECT_INTERVAL_USEC 1000000 + +enum NeonComputeMode { + CP_MODE_PRIMARY = 0, + CP_MODE_REPLICA, + CP_MODE_STATIC +}; + +static const struct config_enum_entry neon_compute_modes[] = { + {"primary", CP_MODE_PRIMARY, false}, + {"replica", CP_MODE_REPLICA, false}, + {"static", CP_MODE_STATIC, false}, + {NULL, 0, false} +}; + /* GUCs */ char *neon_timeline; char *neon_tenant; @@ -62,6 +76,7 @@ int flush_every_n_requests = 8; int neon_protocol_version = 2; +static int neon_compute_mode = 0; static int max_reconnect_attempts = 60; static int stripe_size; @@ -390,9 +405,10 @@ pageserver_connect(shardno_t shard_no, int elevel) { case PS_Disconnected: { - const char *keywords[4]; - const char *values[4]; - char pid_str[16]; + const char *keywords[5]; + const char *values[5]; + char pid_str[16] = { 0 }; + char endpoint_str[36] = { 0 }; int n_pgsql_params; TimestampTz now; int64 us_since_last_attempt; @@ -464,6 +480,31 @@ pageserver_connect(shardno_t shard_no, int elevel) n_pgsql_params++; } + { + bool param_set = false; + switch (neon_compute_mode) + { + case CP_MODE_PRIMARY: + strncpy(endpoint_str, "-c neon.compute_mode=primary", sizeof(endpoint_str)); + param_set = true; + break; + case CP_MODE_REPLICA: + strncpy(endpoint_str, "-c neon.compute_mode=replica", sizeof(endpoint_str)); + param_set = true; + break; + case CP_MODE_STATIC: + strncpy(endpoint_str, "-c neon.compute_mode=static", sizeof(endpoint_str)); + param_set = true; + break; + } + if (param_set) + { + keywords[n_pgsql_params] = "options"; + values[n_pgsql_params] = endpoint_str; + n_pgsql_params++; + } + } + keywords[n_pgsql_params] = NULL; values[n_pgsql_params] = NULL; @@ -1370,6 +1411,17 @@ pg_init_libpagestore(void) GUC_UNIT_MS, NULL, NULL, NULL); + DefineCustomEnumVariable( + "neon.compute_mode", + "The compute endpoint node type", + NULL, + &neon_compute_mode, + CP_MODE_PRIMARY, + neon_compute_modes, + PGC_POSTMASTER, + 0, + NULL, NULL, NULL); + relsize_hash_init(); if (page_server != NULL)