Skip to content

Commit 86dd34f

Browse files
committed
feat(compute_ctl): pass compute type to pageserver with pg_options
Signed-off-by: Alex Chi Z <[email protected]>
1 parent 083a30b commit 86dd34f

File tree

5 files changed

+115
-3
lines changed

5 files changed

+115
-3
lines changed

compute_tools/src/compute.rs

+8
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,14 @@ impl ComputeNode {
855855
info!("Storage auth token not set");
856856
}
857857

858+
config.application_name("compute_ctl");
859+
if let Some(spec) = &compute_state.pspec {
860+
config.options(&format!(
861+
"-c neon.endpoint_type={}",
862+
spec.spec.mode.to_type_str()
863+
));
864+
}
865+
858866
// Connect to pageserver
859867
let mut client = config.connect(NoTls)?;
860868
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;

compute_tools/src/config.rs

+1
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub fn write_postgres_conf(
9999
writeln!(file, "lc_numeric='C.UTF-8'")?;
100100
}
101101

102+
writeln!(file, "neon.endpoint_type={}", spec.mode.to_type_str())?;
102103
match spec.mode {
103104
ComputeMode::Primary => {}
104105
ComputeMode::Static(lsn) => {

libs/compute_api/src/spec.rs

+12
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,18 @@ pub enum ComputeMode {
272272
Replica,
273273
}
274274

275+
impl ComputeMode {
276+
/// Convert the compute mode to a string that can be used to identify the type of compute,
277+
/// which means that if it's a static compute, the LSN will not be included.
278+
pub fn to_type_str(&self) -> &'static str {
279+
match self {
280+
ComputeMode::Primary => "primary",
281+
ComputeMode::Static(_) => "static",
282+
ComputeMode::Replica => "replica",
283+
}
284+
}
285+
}
286+
275287
/// Log level for audit logging
276288
/// Disabled, log, hipaa
277289
/// Default is Disabled

pageserver/src/page_service.rs

+36-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ pub async fn libpq_listener_main(
237237

238238
type ConnectionHandlerResult = anyhow::Result<()>;
239239

240-
#[instrument(skip_all, fields(peer_addr, application_name))]
240+
#[instrument(skip_all, fields(peer_addr, application_name, endpoint_type))]
241241
#[allow(clippy::too_many_arguments)]
242242
async fn page_service_conn_main(
243243
conf: &'static PageServerConf,
@@ -2549,6 +2549,40 @@ where
25492549
if let Some(app_name) = params.get("application_name") {
25502550
Span::current().record("application_name", field::display(app_name));
25512551
}
2552+
if let Some(options) = params.get("options") {
2553+
let mut parsing_config = false;
2554+
for item in options.split_whitespace() {
2555+
if item == "-c" {
2556+
if !parsing_config {
2557+
parsing_config = true;
2558+
} else {
2559+
// "-c" followed with another "-c"
2560+
tracing::warn!("failed to parse the startup options: {options}");
2561+
break;
2562+
}
2563+
} else if parsing_config {
2564+
let Some((key, value)) = item.split_once('=') else {
2565+
// "-c" followed with an invalid option
2566+
tracing::warn!("failed to parse the startup options: {options}");
2567+
break;
2568+
};
2569+
if key == "neon.endpoint_type" {
2570+
Span::current().record("endpoint_type", field::display(value));
2571+
} else {
2572+
tracing::warn!("failed to parse the startup options: {options}");
2573+
break;
2574+
}
2575+
parsing_config = false;
2576+
} else {
2577+
tracing::warn!("failed to parse the startup options: {options}");
2578+
break;
2579+
}
2580+
}
2581+
if parsing_config {
2582+
// "-c" without the option
2583+
tracing::warn!("failed to parse the startup options: {options}");
2584+
}
2585+
}
25522586
};
25532587

25542588
Ok(())
@@ -2662,6 +2696,7 @@ where
26622696
PageServiceCmd::Set => {
26632697
// important because psycopg2 executes "SET datestyle TO 'ISO'"
26642698
// on connect
2699+
// TODO: allow setting options, i.e., application_name/endpoint_type via SET commands
26652700
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
26662701
}
26672702
PageServiceCmd::LeaseLsn(LeaseLsnCmd {

pgxn/neon/libpagestore.c

+58-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,20 @@
5050
#define MIN_RECONNECT_INTERVAL_USEC 1000
5151
#define MAX_RECONNECT_INTERVAL_USEC 1000000
5252

53+
54+
enum NeonEndpointType {
55+
EP_TYPE_PRIMARY = 0,
56+
EP_TYPE_REPLICA,
57+
EP_TYPE_STATIC
58+
};
59+
60+
static const struct config_enum_entry neon_endpoint_types[] = {
61+
{"primary", EP_TYPE_PRIMARY, false},
62+
{"replica", EP_TYPE_REPLICA, false},
63+
{"static", EP_TYPE_STATIC, false},
64+
{NULL, 0, false}
65+
};
66+
5367
/* GUCs */
5468
char *neon_timeline;
5569
char *neon_tenant;
@@ -62,6 +76,8 @@ int flush_every_n_requests = 8;
6276

6377
int neon_protocol_version = 2;
6478

79+
static int neon_endpoint_type = 0;
80+
6581
static int max_reconnect_attempts = 60;
6682
static int stripe_size;
6783

@@ -390,9 +406,10 @@ pageserver_connect(shardno_t shard_no, int elevel)
390406
{
391407
case PS_Disconnected:
392408
{
393-
const char *keywords[4];
394-
const char *values[4];
409+
const char *keywords[5];
410+
const char *values[5];
395411
char pid_str[16];
412+
char endpoint_str[36];
396413
int n_pgsql_params;
397414
TimestampTz now;
398415
int64 us_since_last_attempt;
@@ -464,6 +481,34 @@ pageserver_connect(shardno_t shard_no, int elevel)
464481
n_pgsql_params++;
465482
}
466483

484+
{
485+
int ret = 0;
486+
bool param_set = false;
487+
switch (neon_endpoint_type)
488+
{
489+
case EP_TYPE_PRIMARY:
490+
ret = snprintf(endpoint_str, sizeof(endpoint_str), "-c neon.endpoint_type=primary");
491+
param_set = true;
492+
break;
493+
case EP_TYPE_REPLICA:
494+
ret = snprintf(endpoint_str, sizeof(endpoint_str), "-c neon.endpoint_type=replica");
495+
param_set = true;
496+
break;
497+
case EP_TYPE_STATIC:
498+
ret = snprintf(endpoint_str, sizeof(endpoint_str), "-c neon.endpoint_type=static");
499+
param_set = true;
500+
break;
501+
}
502+
if (ret < 0 || ret >= (int)(sizeof(endpoint_str)))
503+
elog(FATAL, "stack-allocated buffer too small to hold endpoint type");
504+
if (param_set)
505+
{
506+
keywords[n_pgsql_params] = "options";
507+
values[n_pgsql_params] = endpoint_str;
508+
n_pgsql_params++;
509+
}
510+
}
511+
467512
keywords[n_pgsql_params] = NULL;
468513
values[n_pgsql_params] = NULL;
469514

@@ -1370,6 +1415,17 @@ pg_init_libpagestore(void)
13701415
GUC_UNIT_MS,
13711416
NULL, NULL, NULL);
13721417

1418+
DefineCustomEnumVariable(
1419+
"neon.endpoint_type",
1420+
"The compute endpoint node type",
1421+
NULL,
1422+
&neon_endpoint_type,
1423+
EP_TYPE_PRIMARY,
1424+
neon_endpoint_types,
1425+
PGC_POSTMASTER,
1426+
0,
1427+
NULL, NULL, NULL);
1428+
13731429
relsize_hash_init();
13741430

13751431
if (page_server != NULL)

0 commit comments

Comments
 (0)