From d60fa408c4c22d9ca8a1e0375c3d3c81562a01f7 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Mon, 17 Mar 2025 18:31:37 -0400 Subject: [PATCH 1/4] feat(compute_ctl): pass compute type to pageserver with pg_options Signed-off-by: Alex Chi Z --- compute_tools/src/compute.rs | 8 +++++ compute_tools/src/config.rs | 1 + libs/compute_api/src/spec.rs | 12 +++++++ pageserver/src/page_service.rs | 37 ++++++++++++++++++++- pgxn/neon/libpagestore.c | 59 ++++++++++++++++++++++++++++++++-- 5 files changed, 114 insertions(+), 3 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 354528e2cde5..a9d2c87679df 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -855,6 +855,14 @@ impl ComputeNode { info!("Storage auth token not set"); } + config.application_name("compute_ctl"); + if let Some(spec) = &compute_state.pspec { + config.options(&format!( + "-c neon.endpoint_type={}", + spec.spec.mode.to_type_str() + )); + } + // Connect to pageserver let mut client = config.connect(NoTls)?; let pageserver_connect_micros = start_time.elapsed().as_micros() as u64; diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 0760568ff822..1de221c4355d 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -99,6 +99,7 @@ pub fn write_postgres_conf( writeln!(file, "lc_numeric='C.UTF-8'")?; } + writeln!(file, "neon.endpoint_type={}", spec.mode.to_type_str())?; match spec.mode { ComputeMode::Primary => {} ComputeMode::Static(lsn) => { diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 77f2e1e631aa..2b166d0f113c 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -272,6 +272,18 @@ pub enum ComputeMode { Replica, } +impl ComputeMode { + /// Convert the compute mode to a string that can be used to identify the type of compute, + /// which means that if it's a static compute, the LSN will not be included. + pub fn to_type_str(&self) -> &'static str { + match self { + ComputeMode::Primary => "primary", + ComputeMode::Static(_) => "static", + ComputeMode::Replica => "replica", + } + } +} + /// Log level for audit logging /// Disabled, log, hipaa /// Default is Disabled diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f2d2ab05ad96..51b54afb9da6 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -237,7 +237,7 @@ pub async fn libpq_listener_main( type ConnectionHandlerResult = anyhow::Result<()>; -#[instrument(skip_all, fields(peer_addr, application_name))] +#[instrument(skip_all, fields(peer_addr, application_name, endpoint_type))] #[allow(clippy::too_many_arguments)] async fn page_service_conn_main( conf: &'static PageServerConf, @@ -2549,6 +2549,40 @@ where if let Some(app_name) = params.get("application_name") { Span::current().record("application_name", field::display(app_name)); } + if let Some(options) = params.get("options") { + let mut parsing_config = false; + for item in options.split_whitespace() { + if item == "-c" { + if !parsing_config { + parsing_config = true; + } else { + // "-c" followed with another "-c" + tracing::warn!("failed to parse the startup options: {options}"); + break; + } + } else if parsing_config { + let Some((key, value)) = item.split_once('=') else { + // "-c" followed with an invalid option + tracing::warn!("failed to parse the startup options: {options}"); + break; + }; + if key == "neon.endpoint_type" { + Span::current().record("endpoint_type", field::display(value)); + } else { + tracing::warn!("failed to parse the startup options: {options}"); + break; + } + parsing_config = false; + } else { + tracing::warn!("failed to parse the startup options: {options}"); + break; + } + } + if parsing_config { + // "-c" without the option + tracing::warn!("failed to parse the startup options: {options}"); + } + } }; Ok(()) @@ -2662,6 +2696,7 @@ where PageServiceCmd::Set => { // important because psycopg2 executes "SET datestyle TO 'ISO'" // on connect + // TODO: allow setting options, i.e., application_name/endpoint_type via SET commands pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } PageServiceCmd::LeaseLsn(LeaseLsnCmd { diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 637281fe4a7a..e493e8af36db 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -50,6 +50,20 @@ #define MIN_RECONNECT_INTERVAL_USEC 1000 #define MAX_RECONNECT_INTERVAL_USEC 1000000 + +enum NeonEndpointType { + EP_TYPE_PRIMARY = 0, + EP_TYPE_REPLICA, + EP_TYPE_STATIC +}; + +static const struct config_enum_entry neon_endpoint_types[] = { + {"primary", EP_TYPE_PRIMARY, false}, + {"replica", EP_TYPE_REPLICA, false}, + {"static", EP_TYPE_STATIC, false}, + {NULL, 0, false} +}; + /* GUCs */ char *neon_timeline; char *neon_tenant; @@ -62,6 +76,7 @@ int flush_every_n_requests = 8; int neon_protocol_version = 2; +static int neon_endpoint_type = 0; static int max_reconnect_attempts = 60; static int stripe_size; @@ -390,9 +405,10 @@ pageserver_connect(shardno_t shard_no, int elevel) { case PS_Disconnected: { - const char *keywords[4]; - const char *values[4]; + const char *keywords[5]; + const char *values[5]; char pid_str[16]; + char endpoint_str[36]; int n_pgsql_params; TimestampTz now; int64 us_since_last_attempt; @@ -464,6 +480,34 @@ pageserver_connect(shardno_t shard_no, int elevel) n_pgsql_params++; } + { + int ret = 0; + bool param_set = false; + switch (neon_endpoint_type) + { + case EP_TYPE_PRIMARY: + ret = snprintf(endpoint_str, sizeof(endpoint_str), "-c neon.endpoint_type=primary"); + param_set = true; + break; + case EP_TYPE_REPLICA: + ret = snprintf(endpoint_str, sizeof(endpoint_str), "-c neon.endpoint_type=replica"); + param_set = true; + break; + case EP_TYPE_STATIC: + ret = snprintf(endpoint_str, sizeof(endpoint_str), "-c neon.endpoint_type=static"); + param_set = true; + break; + } + if (ret < 0 || ret >= (int)(sizeof(endpoint_str))) + elog(FATAL, "stack-allocated buffer too small to hold endpoint type"); + if (param_set) + { + keywords[n_pgsql_params] = "options"; + values[n_pgsql_params] = endpoint_str; + n_pgsql_params++; + } + } + keywords[n_pgsql_params] = NULL; values[n_pgsql_params] = NULL; @@ -1370,6 +1414,17 @@ pg_init_libpagestore(void) GUC_UNIT_MS, NULL, NULL, NULL); + DefineCustomEnumVariable( + "neon.endpoint_type", + "The compute endpoint node type", + NULL, + &neon_endpoint_type, + EP_TYPE_PRIMARY, + neon_endpoint_types, + PGC_POSTMASTER, + 0, + NULL, NULL, NULL); + relsize_hash_init(); if (page_server != NULL) From 792263a10ce845f6285ce12dcc0ffba154cc2c9f Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 18 Mar 2025 16:29:10 -0400 Subject: [PATCH 2/4] fix parsing; resolve comments Signed-off-by: Alex Chi Z --- pageserver/src/page_service.rs | 12 ++++++++++-- pgxn/neon/libpagestore.c | 13 +++++-------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 51b54afb9da6..a5c4f9fe9c07 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -2560,12 +2560,20 @@ where tracing::warn!("failed to parse the startup options: {options}"); break; } - } else if parsing_config { - let Some((key, value)) = item.split_once('=') else { + } else if item.starts_with("-c") || parsing_config { + let Some((mut key, value)) = item.split_once('=') else { // "-c" followed with an invalid option tracing::warn!("failed to parse the startup options: {options}"); break; }; + if !parsing_config { + // Parse "-coptions=X" + let Some(stripped_key) = key.strip_prefix("-c") else { + tracing::warn!("failed to parse the startup options: {options}"); + break; + }; + key = stripped_key; + } if key == "neon.endpoint_type" { Span::current().record("endpoint_type", field::display(value)); } else { diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index e493e8af36db..9767d0d43366 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -407,8 +407,8 @@ pageserver_connect(shardno_t shard_no, int elevel) { const char *keywords[5]; const char *values[5]; - char pid_str[16]; - char endpoint_str[36]; + char pid_str[16] = { 0 }; + char endpoint_str[36] = { 0 }; int n_pgsql_params; TimestampTz now; int64 us_since_last_attempt; @@ -481,25 +481,22 @@ pageserver_connect(shardno_t shard_no, int elevel) } { - int ret = 0; bool param_set = false; switch (neon_endpoint_type) { case EP_TYPE_PRIMARY: - ret = snprintf(endpoint_str, sizeof(endpoint_str), "-c neon.endpoint_type=primary"); + strncpy(endpoint_str, "-c neon.endpoint_type=primary", sizeof(endpoint_str)); param_set = true; break; case EP_TYPE_REPLICA: - ret = snprintf(endpoint_str, sizeof(endpoint_str), "-c neon.endpoint_type=replica"); + strncpy(endpoint_str, "-c neon.endpoint_type=replica", sizeof(endpoint_str)); param_set = true; break; case EP_TYPE_STATIC: - ret = snprintf(endpoint_str, sizeof(endpoint_str), "-c neon.endpoint_type=static"); + strncpy(endpoint_str, "-c neon.endpoint_type=static", sizeof(endpoint_str)); param_set = true; break; } - if (ret < 0 || ret >= (int)(sizeof(endpoint_str))) - elog(FATAL, "stack-allocated buffer too small to hold endpoint type"); if (param_set) { keywords[n_pgsql_params] = "options"; From 7a9fe7b7bedba873159480370848dd1274f34d3e Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 18 Mar 2025 17:10:26 -0400 Subject: [PATCH 3/4] more testing Signed-off-by: Alex Chi Z --- pageserver/src/page_service.rs | 130 +++++++++++++++++++++++---------- 1 file changed, 92 insertions(+), 38 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index a5c4f9fe9c07..1ab14c1806a8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -2505,6 +2505,52 @@ impl PageServiceCmd { } } +fn parse_options(options: &str) -> (Vec<(String, String)>, bool) { + let mut parsing_config = false; + let mut has_error = false; + let mut config = Vec::new(); + for item in options.split_whitespace() { + if item == "-c" { + if !parsing_config { + parsing_config = true; + } else { + // "-c" followed with another "-c" + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + break; + } + } else if item.starts_with("-c") || parsing_config { + let Some((mut key, value)) = item.split_once('=') else { + // "-c" followed with an invalid option + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + break; + }; + if !parsing_config { + // Parse "-coptions=X" + let Some(stripped_key) = key.strip_prefix("-c") else { + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + break; + }; + key = stripped_key; + } + config.push((key.to_string(), value.to_string())); + parsing_config = false; + } else { + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + break; + } + } + if parsing_config { + // "-c" without the option + tracing::warn!("failed to parse the startup options: {options}"); + has_error = true; + } + (config, has_error) +} + impl postgres_backend::Handler for PageServerHandler where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, @@ -2550,46 +2596,12 @@ where Span::current().record("application_name", field::display(app_name)); } if let Some(options) = params.get("options") { - let mut parsing_config = false; - for item in options.split_whitespace() { - if item == "-c" { - if !parsing_config { - parsing_config = true; - } else { - // "-c" followed with another "-c" - tracing::warn!("failed to parse the startup options: {options}"); - break; - } - } else if item.starts_with("-c") || parsing_config { - let Some((mut key, value)) = item.split_once('=') else { - // "-c" followed with an invalid option - tracing::warn!("failed to parse the startup options: {options}"); - break; - }; - if !parsing_config { - // Parse "-coptions=X" - let Some(stripped_key) = key.strip_prefix("-c") else { - tracing::warn!("failed to parse the startup options: {options}"); - break; - }; - key = stripped_key; - } - if key == "neon.endpoint_type" { - Span::current().record("endpoint_type", field::display(value)); - } else { - tracing::warn!("failed to parse the startup options: {options}"); - break; - } - parsing_config = false; - } else { - tracing::warn!("failed to parse the startup options: {options}"); - break; + let (config, _) = parse_options(options); + for (key, value) in config { + if key == "neon.endpoint_type" { + Span::current().record("endpoint_type", field::display(value)); } } - if parsing_config { - // "-c" without the option - tracing::warn!("failed to parse the startup options: {options}"); - } } }; @@ -2979,4 +2991,46 @@ mod tests { let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE")); assert!(cmd.is_err()); } + + #[test] + fn test_parse_options() { + let (config, has_error) = parse_options(" -c neon.endpoint_type=primary "); + assert!(!has_error); + assert_eq!( + config, + vec![("neon.endpoint_type".to_string(), "primary".to_string())] + ); + + let (config, has_error) = parse_options(" -c neon.endpoint_type=primary -c foo=bar "); + assert!(!has_error); + assert_eq!( + config, + vec![ + ("neon.endpoint_type".to_string(), "primary".to_string()), + ("foo".to_string(), "bar".to_string()), + ] + ); + + let (config, has_error) = parse_options(" -c neon.endpoint_type=primary -cfoo=bar"); + assert!(!has_error); + assert_eq!( + config, + vec![ + ("neon.endpoint_type".to_string(), "primary".to_string()), + ("foo".to_string(), "bar".to_string()), + ] + ); + + let (_, has_error) = parse_options("-c"); + assert!(has_error); + + let (_, has_error) = parse_options("-c foo=bar -c -c"); + assert!(has_error); + + let (_, has_error) = parse_options(" "); + assert!(!has_error); + + let (_, has_error) = parse_options(" -c neon.endpoint_type"); + assert!(has_error); + } } From 829555c5ba5cab8b5db3e75942400693455b1b4a Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 19 Mar 2025 14:52:10 -0400 Subject: [PATCH 4/4] resolve comments Signed-off-by: Alex Chi Z --- compute_tools/src/compute.rs | 2 +- compute_tools/src/config.rs | 2 +- pageserver/src/page_service.rs | 28 ++++++++++++++---------- pgxn/neon/libpagestore.c | 40 +++++++++++++++++----------------- 4 files changed, 39 insertions(+), 33 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index a9d2c87679df..6df2bb41bc75 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -858,7 +858,7 @@ impl ComputeNode { config.application_name("compute_ctl"); if let Some(spec) = &compute_state.pspec { config.options(&format!( - "-c neon.endpoint_type={}", + "-c neon.compute_mode={}", spec.spec.mode.to_type_str() )); } diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 1de221c4355d..ea7797a548f1 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -99,7 +99,7 @@ pub fn write_postgres_conf( writeln!(file, "lc_numeric='C.UTF-8'")?; } - writeln!(file, "neon.endpoint_type={}", spec.mode.to_type_str())?; + writeln!(file, "neon.compute_mode={}", spec.mode.to_type_str())?; match spec.mode { ComputeMode::Primary => {} ComputeMode::Static(lsn) => { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1ab14c1806a8..1268de24258e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -237,7 +237,7 @@ pub async fn libpq_listener_main( type ConnectionHandlerResult = anyhow::Result<()>; -#[instrument(skip_all, fields(peer_addr, application_name, endpoint_type))] +#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))] #[allow(clippy::too_many_arguments)] async fn page_service_conn_main( conf: &'static PageServerConf, @@ -2505,6 +2505,12 @@ impl PageServiceCmd { } } +/// Parse the startup options from the postgres wire protocol startup packet. +/// +/// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string +/// by best effort and returns all the options parsed (key-value pairs) and a bool indicating +/// whether all options are successfully parsed. There could be duplicates in the options +/// if the caller passed such parameters. fn parse_options(options: &str) -> (Vec<(String, String)>, bool) { let mut parsing_config = false; let mut has_error = false; @@ -2598,8 +2604,8 @@ where if let Some(options) = params.get("options") { let (config, _) = parse_options(options); for (key, value) in config { - if key == "neon.endpoint_type" { - Span::current().record("endpoint_type", field::display(value)); + if key == "neon.compute_mode" { + Span::current().record("compute_mode", field::display(value)); } } } @@ -2716,7 +2722,7 @@ where PageServiceCmd::Set => { // important because psycopg2 executes "SET datestyle TO 'ISO'" // on connect - // TODO: allow setting options, i.e., application_name/endpoint_type via SET commands + // TODO: allow setting options, i.e., application_name/compute_mode via SET commands pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } PageServiceCmd::LeaseLsn(LeaseLsnCmd { @@ -2994,29 +3000,29 @@ mod tests { #[test] fn test_parse_options() { - let (config, has_error) = parse_options(" -c neon.endpoint_type=primary "); + let (config, has_error) = parse_options(" -c neon.compute_mode=primary "); assert!(!has_error); assert_eq!( config, - vec![("neon.endpoint_type".to_string(), "primary".to_string())] + vec![("neon.compute_mode".to_string(), "primary".to_string())] ); - let (config, has_error) = parse_options(" -c neon.endpoint_type=primary -c foo=bar "); + let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar "); assert!(!has_error); assert_eq!( config, vec![ - ("neon.endpoint_type".to_string(), "primary".to_string()), + ("neon.compute_mode".to_string(), "primary".to_string()), ("foo".to_string(), "bar".to_string()), ] ); - let (config, has_error) = parse_options(" -c neon.endpoint_type=primary -cfoo=bar"); + let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar"); assert!(!has_error); assert_eq!( config, vec![ - ("neon.endpoint_type".to_string(), "primary".to_string()), + ("neon.compute_mode".to_string(), "primary".to_string()), ("foo".to_string(), "bar".to_string()), ] ); @@ -3030,7 +3036,7 @@ mod tests { let (_, has_error) = parse_options(" "); assert!(!has_error); - let (_, has_error) = parse_options(" -c neon.endpoint_type"); + let (_, has_error) = parse_options(" -c neon.compute_mode"); assert!(has_error); } } diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 9767d0d43366..a71de34702d1 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -51,16 +51,16 @@ #define MAX_RECONNECT_INTERVAL_USEC 1000000 -enum NeonEndpointType { - EP_TYPE_PRIMARY = 0, - EP_TYPE_REPLICA, - EP_TYPE_STATIC +enum NeonComputeMode { + CP_MODE_PRIMARY = 0, + CP_MODE_REPLICA, + CP_MODE_STATIC }; -static const struct config_enum_entry neon_endpoint_types[] = { - {"primary", EP_TYPE_PRIMARY, false}, - {"replica", EP_TYPE_REPLICA, false}, - {"static", EP_TYPE_STATIC, false}, +static const struct config_enum_entry neon_compute_modes[] = { + {"primary", CP_MODE_PRIMARY, false}, + {"replica", CP_MODE_REPLICA, false}, + {"static", CP_MODE_STATIC, false}, {NULL, 0, false} }; @@ -76,7 +76,7 @@ int flush_every_n_requests = 8; int neon_protocol_version = 2; -static int neon_endpoint_type = 0; +static int neon_compute_mode = 0; static int max_reconnect_attempts = 60; static int stripe_size; @@ -482,18 +482,18 @@ pageserver_connect(shardno_t shard_no, int elevel) { bool param_set = false; - switch (neon_endpoint_type) + switch (neon_compute_mode) { - case EP_TYPE_PRIMARY: - strncpy(endpoint_str, "-c neon.endpoint_type=primary", sizeof(endpoint_str)); + case CP_MODE_PRIMARY: + strncpy(endpoint_str, "-c neon.compute_mode=primary", sizeof(endpoint_str)); param_set = true; break; - case EP_TYPE_REPLICA: - strncpy(endpoint_str, "-c neon.endpoint_type=replica", sizeof(endpoint_str)); + case CP_MODE_REPLICA: + strncpy(endpoint_str, "-c neon.compute_mode=replica", sizeof(endpoint_str)); param_set = true; break; - case EP_TYPE_STATIC: - strncpy(endpoint_str, "-c neon.endpoint_type=static", sizeof(endpoint_str)); + case CP_MODE_STATIC: + strncpy(endpoint_str, "-c neon.compute_mode=static", sizeof(endpoint_str)); param_set = true; break; } @@ -1412,12 +1412,12 @@ pg_init_libpagestore(void) NULL, NULL, NULL); DefineCustomEnumVariable( - "neon.endpoint_type", + "neon.compute_mode", "The compute endpoint node type", NULL, - &neon_endpoint_type, - EP_TYPE_PRIMARY, - neon_endpoint_types, + &neon_compute_mode, + CP_MODE_PRIMARY, + neon_compute_modes, PGC_POSTMASTER, 0, NULL, NULL, NULL);