Skip to content

Commit

Permalink
Use compute_ctl to launch endpoints.
Browse files Browse the repository at this point in the history
This adds test coverage for 'compute_ctl', as it is now used by all
the python tests.

There are a few differences in how 'compute_ctl' is called in the
tests, compared to the real web console:

- In the tests, the postgresql.conf file is included as one large
  string in the spec file, and it is written out as it is to the data
  directory.  I added a new field for that to the spec file. The real
  web console, however, sets all the necessary settings in the
  'settings' field, and 'compute_ctl' creates the postgresql.conf from
  those settings.

- In the tests, the information needed to connect to the storage, i.e.
  tenant_id, timeline_id, connection strings to pageserver and
  safekeepers, are now passed as new fields in the spec file. The real
  web console includes them as the GUCs in the 'settings' field. (Both
  of these are different from what the test control plane used to do:
  It used to write the GUCs directly in the postgresql.conf file). The
  plan is to change the control plane to use the new method, and
  remove the old method, but for now, support both.

Some tests that were sensitive to the amount of WAL generated needed
small changes, to accommodate that compute_ctl runs the background
health monitor which makes a few small updates. Also some tests shut
down the pageserver, and now that the background health check can run
some queries while the pageserver is down, that can produce a few
extra errors in the logs, which needed to be allowlisted.
  • Loading branch information
hlinnaka committed May 9, 2023
1 parent db80854 commit 75227e8
Show file tree
Hide file tree
Showing 21 changed files with 631 additions and 659 deletions.
46 changes: 29 additions & 17 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,26 +106,38 @@ pub struct ParsedSpec {
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
// Extract the options from the spec file that are needed to connect to
// the storage system.
//
// For backwards-compatibility, the top-level fields in the spec file
// may be empty. In that case, we need to dig them from the GUCs in the
// cluster.settings field.
let pageserver_connstr = spec
.cluster
.settings
.find("neon.pageserver_connstring")
.pageserver_connstring
.clone()
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
.ok_or("pageserver connstr should be provided")?;
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = spec
.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?;
let timeline_id: TimelineId = spec
.cluster
.settings
.find("neon.timeline_id")
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?;
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
tenant_id
} else {
spec.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?
};
let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
timeline_id
} else {
spec.cluster
.settings
.find("neon.timeline_id")
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?
};

Ok(ParsedSpec {
spec,
Expand Down
44 changes: 42 additions & 2 deletions compute_tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::path::Path;

use anyhow::Result;

use crate::pg_helpers::escape_conf_value;
use crate::pg_helpers::PgOptionsSerialize;
use compute_api::spec::{ComputeMode, ComputeSpec};

Expand Down Expand Up @@ -36,10 +37,44 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
// File::create() destroys the file content if it exists.
let mut file = File::create(path)?;

writeln!(file, "# Managed by compute_ctl: begin")?;
// Write the postgresql.conf content from the spec file as is.
if let Some(conf) = &spec.cluster.postgresql_conf {
writeln!(file, "{}", conf)?;
}

write!(file, "{}", &spec.cluster.settings.as_pg_settings())?;

// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
if let Some(s) = &spec.pageserver_connstring {
writeln!(
file,
"neon.pageserver_connstring='{}'",
escape_conf_value(s)
)?;
}
if !spec.safekeeper_connstrings.is_empty() {
writeln!(
file,
"neon.safekeepers='{}'",
escape_conf_value(&spec.safekeeper_connstrings.join(","))
)?;
}
if let Some(s) = &spec.tenant_id {
writeln!(
file,
"neon.tenant_id='{}'",
escape_conf_value(&s.to_string())
)?;
}
if let Some(s) = &spec.timeline_id {
writeln!(
file,
"neon.timeline_id='{}'",
escape_conf_value(&s.to_string())
)?;
}

match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Static(lsn) => {
Expand All @@ -53,7 +88,12 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
}
}

writeln!(file, "# Managed by compute_ctl: end")?;
// If there are any extra options in the 'settings' field, append those
if spec.cluster.settings.is_some() {
writeln!(file, "# Managed by compute_ctl: begin")?;
write!(file, "{}", spec.cluster.settings.as_pg_settings())?;
writeln!(file, "# Managed by compute_ctl: end")?;
}

Ok(())
}
2 changes: 1 addition & 1 deletion compute_tools/src/pg_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn escape_literal(s: &str) -> String {

/// Escape a string so that it can be used in postgresql.conf.
/// Same as escape_literal, currently.
fn escape_conf_value(s: &str) -> String {
pub fn escape_conf_value(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
}

Expand Down
70 changes: 55 additions & 15 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,11 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -

println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(
tenant_id,
name,
tenant_id,
timeline_id,
None,
None,
pg_version,
ComputeMode::Primary,
)?;
Expand Down Expand Up @@ -591,7 +592,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(

table.add_row([
endpoint_id.as_str(),
&endpoint.address.to_string(),
&endpoint.pg_address.to_string(),
&endpoint.timeline_id.to_string(),
branch_name,
lsn_str.as_str(),
Expand Down Expand Up @@ -620,8 +621,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_branch_timeline_id(branch_name, tenant_id)
.ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;

let port: Option<u16> = sub_args.get_one::<u16>("port").copied();

let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
let pg_version = sub_args
.get_one::<u32>("pg-version")
.copied()
Expand All @@ -639,14 +640,38 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};

cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, port, pg_version, mode)?;
cplane.new_endpoint(
&endpoint_id,
tenant_id,
timeline_id,
pg_port,
http_port,
pg_version,
mode,
)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;

// If --safekeepers argument is given, use only the listed safekeeper nodes.
let safekeepers =
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
let mut safekeepers: Vec<NodeId> = Vec::new();
for sk_id in safekeepers_str.split(',').map(str::trim) {
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
})?);
safekeepers.push(sk_id);
}
safekeepers
} else {
env.safekeepers.iter().map(|sk| sk.id).collect()
};

let endpoint = cplane.endpoints.get(endpoint_id.as_str());

let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
Expand All @@ -673,7 +698,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
_ => {}
}
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?;
endpoint.start(&auth_token, safekeepers)?;
} else {
let branch_name = sub_args
.get_one::<String>("branch-name")
Expand Down Expand Up @@ -709,14 +734,15 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");

let ep = cplane.new_endpoint(
tenant_id,
endpoint_id,
tenant_id,
timeline_id,
port,
pg_port,
http_port,
pg_version,
mode,
)?;
ep.start(&auth_token)?;
ep.start(&auth_token, safekeepers)?;
}
}
"stop" => {
Expand Down Expand Up @@ -944,11 +970,22 @@ fn cli() -> Command {
.value_parser(value_parser!(u32))
.default_value(DEFAULT_PG_VERSION);

let port_arg = Arg::new("port")
.long("port")
let pg_port_arg = Arg::new("pg-port")
.long("pg-port")
.required(false)
.value_parser(value_parser!(u16))
.value_name("port");
.value_name("pg-port");

let http_port_arg = Arg::new("http-port")
.long("http-port")
.required(false)
.value_parser(value_parser!(u16))
.value_name("http-port");

let safekeepers_arg = Arg::new("safekeepers")
.long("safekeepers")
.required(false)
.value_name("safekeepers");

let stop_mode_arg = Arg::new("stop-mode")
.short('m')
Expand Down Expand Up @@ -1093,7 +1130,8 @@ fn cli() -> Command {
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone())
.arg(pg_port_arg.clone())
.arg(http_port_arg.clone())
.arg(
Arg::new("config-only")
.help("Don't do basebackup, create endpoint directory with only config files")
Expand All @@ -1109,9 +1147,11 @@ fn cli() -> Command {
.arg(branch_name_arg)
.arg(timeline_id_arg)
.arg(lsn_arg)
.arg(port_arg)
.arg(pg_port_arg)
.arg(http_port_arg)
.arg(pg_version_arg)
.arg(hot_standby_arg)
.arg(safekeepers_arg)
)
.subcommand(
Command::new("stop")
Expand Down
6 changes: 6 additions & 0 deletions control_plane/src/broker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
//! Code to manage the storage broker
//!
//! In the local test environment, the data for each safekeeper is stored in
//!
//! .neon/safekeepers/<safekeeper id>
//!
use anyhow::Context;

use std::path::PathBuf;
Expand Down
Loading

0 comments on commit 75227e8

Please sign in to comment.