Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compute_ctl): pass compute type to pageserver with pg_options #11287

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,14 @@ impl ComputeNode {
info!("Storage auth token not set");
}

config.application_name("compute_ctl");
if let Some(spec) = &compute_state.pspec {
config.options(&format!(
"-c neon.compute_mode={}",
spec.spec.mode.to_type_str()
));
}

// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
Expand Down
1 change: 1 addition & 0 deletions compute_tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub fn write_postgres_conf(
writeln!(file, "lc_numeric='C.UTF-8'")?;
}

writeln!(file, "neon.compute_mode={}", spec.mode.to_type_str())?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Static(lsn) => {
Expand Down
12 changes: 12 additions & 0 deletions libs/compute_api/src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,18 @@ pub enum ComputeMode {
Replica,
}

impl ComputeMode {
/// Convert the compute mode to a string that can be used to identify the type of compute,
/// which means that if it's a static compute, the LSN will not be included.
pub fn to_type_str(&self) -> &'static str {
match self {
ComputeMode::Primary => "primary",
ComputeMode::Static(_) => "static",
ComputeMode::Replica => "replica",
}
}
}

/// Log level for audit logging
/// Disabled, log, hipaa
/// Default is Disabled
Expand Down
105 changes: 104 additions & 1 deletion pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ pub async fn libpq_listener_main(

type ConnectionHandlerResult = anyhow::Result<()>;

#[instrument(skip_all, fields(peer_addr, application_name))]
#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
conf: &'static PageServerConf,
Expand Down Expand Up @@ -2505,6 +2505,58 @@ impl PageServiceCmd {
}
}

/// Parse the startup options from the postgres wire protocol startup packet.
///
/// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
/// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
/// whether all options are successfully parsed. There could be duplicates in the options
/// if the caller passed such parameters.
fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
let mut parsing_config = false;
let mut has_error = false;
let mut config = Vec::new();
for item in options.split_whitespace() {
if item == "-c" {
if !parsing_config {
parsing_config = true;
} else {
// "-c" followed with another "-c"
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
}
} else if item.starts_with("-c") || parsing_config {
let Some((mut key, value)) = item.split_once('=') else {
// "-c" followed with an invalid option
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
};
if !parsing_config {
// Parse "-coptions=X"
let Some(stripped_key) = key.strip_prefix("-c") else {
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
};
key = stripped_key;
}
config.push((key.to_string(), value.to_string()));
parsing_config = false;
} else {
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
}
}
if parsing_config {
// "-c" without the option
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
}
(config, has_error)
}

impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
Expand Down Expand Up @@ -2549,6 +2601,14 @@ where
if let Some(app_name) = params.get("application_name") {
Span::current().record("application_name", field::display(app_name));
}
if let Some(options) = params.get("options") {
let (config, _) = parse_options(options);
for (key, value) in config {
if key == "neon.compute_mode" {
Span::current().record("compute_mode", field::display(value));
}
}
}
};

Ok(())
Expand Down Expand Up @@ -2662,6 +2722,7 @@ where
PageServiceCmd::Set => {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
// TODO: allow setting options, i.e., application_name/compute_mode via SET commands
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
PageServiceCmd::LeaseLsn(LeaseLsnCmd {
Expand Down Expand Up @@ -2936,4 +2997,46 @@ mod tests {
let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
assert!(cmd.is_err());
}

#[test]
fn test_parse_options() {
let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
assert!(!has_error);
assert_eq!(
config,
vec![("neon.compute_mode".to_string(), "primary".to_string())]
);

let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
assert!(!has_error);
assert_eq!(
config,
vec![
("neon.compute_mode".to_string(), "primary".to_string()),
("foo".to_string(), "bar".to_string()),
]
);

let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
assert!(!has_error);
assert_eq!(
config,
vec![
("neon.compute_mode".to_string(), "primary".to_string()),
("foo".to_string(), "bar".to_string()),
]
);

let (_, has_error) = parse_options("-c");
assert!(has_error);

let (_, has_error) = parse_options("-c foo=bar -c -c");
assert!(has_error);

let (_, has_error) = parse_options(" ");
assert!(!has_error);

let (_, has_error) = parse_options(" -c neon.compute_mode");
assert!(has_error);
}
}
58 changes: 55 additions & 3 deletions pgxn/neon/libpagestore.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000


enum NeonComputeMode {
CP_MODE_PRIMARY = 0,
CP_MODE_REPLICA,
CP_MODE_STATIC
};

static const struct config_enum_entry neon_compute_modes[] = {
{"primary", CP_MODE_PRIMARY, false},
{"replica", CP_MODE_REPLICA, false},
{"static", CP_MODE_STATIC, false},
{NULL, 0, false}
};

/* GUCs */
char *neon_timeline;
char *neon_tenant;
Expand All @@ -62,6 +76,7 @@ int flush_every_n_requests = 8;

int neon_protocol_version = 2;

static int neon_compute_mode = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;

Expand Down Expand Up @@ -390,9 +405,10 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
case PS_Disconnected:
{
const char *keywords[4];
const char *values[4];
char pid_str[16];
const char *keywords[5];
const char *values[5];
char pid_str[16] = { 0 };
char endpoint_str[36] = { 0 };
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
Expand Down Expand Up @@ -464,6 +480,31 @@ pageserver_connect(shardno_t shard_no, int elevel)
n_pgsql_params++;
}

{
bool param_set = false;
switch (neon_compute_mode)
{
case CP_MODE_PRIMARY:
strncpy(endpoint_str, "-c neon.compute_mode=primary", sizeof(endpoint_str));
param_set = true;
break;
case CP_MODE_REPLICA:
strncpy(endpoint_str, "-c neon.compute_mode=replica", sizeof(endpoint_str));
param_set = true;
break;
case CP_MODE_STATIC:
strncpy(endpoint_str, "-c neon.compute_mode=static", sizeof(endpoint_str));
param_set = true;
break;
}
if (param_set)
{
keywords[n_pgsql_params] = "options";
values[n_pgsql_params] = endpoint_str;
n_pgsql_params++;
}
}

keywords[n_pgsql_params] = NULL;
values[n_pgsql_params] = NULL;

Expand Down Expand Up @@ -1370,6 +1411,17 @@ pg_init_libpagestore(void)
GUC_UNIT_MS,
NULL, NULL, NULL);

DefineCustomEnumVariable(
"neon.compute_mode",
"The compute endpoint node type",
NULL,
&neon_compute_mode,
CP_MODE_PRIMARY,
neon_compute_modes,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);

relsize_hash_init();

if (page_server != NULL)
Expand Down
Loading