Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use compute_ctl to manage Postgres in tests. #3886

Merged
merged 12 commits into from
Jun 6, 2023
Merged
14 changes: 13 additions & 1 deletion compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ fn main() -> Result<()> {

let matches = cli().get_matches();

let http_port = *matches
.get_one::<u16>("http-port")
.expect("http-port is required");
let pgdata = matches
.get_one::<String>("pgdata")
.expect("PGDATA path is required");
Expand Down Expand Up @@ -178,7 +181,8 @@ fn main() -> Result<()> {

// Launch http service first, so we were able to serve control-plane
// requests, while configuration is still in progress.
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");
let _http_handle =
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");

if !spec_set {
// No spec provided, hang waiting for it.
Expand Down Expand Up @@ -286,6 +290,14 @@ fn cli() -> clap::Command {
let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
clap::Command::new("compute_ctl")
.version(version)
.arg(
Arg::new("http-port")
.long("http-port")
.value_name("HTTP_PORT")
.default_value("3080")
hlinnaka marked this conversation as resolved.
Show resolved Hide resolved
.value_parser(clap::value_parser!(u16))
.required(false),
)
.arg(
Arg::new("connstr")
.short('C')
Expand Down
70 changes: 33 additions & 37 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
//
// XXX: This starts to be scarry similar to the `PostgresNode` from `control_plane`,
// but there are several things that makes `PostgresNode` usage inconvenient in the
// cloud:
// - it inherits from `LocalEnv`, which contains **all-all** the information about
// a complete service running
// - it uses `PageServerNode` with information about http endpoint, which we do not
// need in the cloud again
// - many tiny pieces like, for example, we do not use `pg_ctl` in the cloud
//
// Thus, to use `PostgresNode` in the cloud, we need to 'mock' a bunch of required
// attributes (not required for the cloud). Yet, it is still tempting to unify these
// `PostgresNode` and `ComputeNode` and use one in both places.
//
// TODO: stabilize `ComputeNode` and think about using it in the `control_plane`.
//
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
Expand Down Expand Up @@ -106,26 +90,38 @@ pub struct ParsedSpec {
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
// Extract the options from the spec file that are needed to connect to
// the storage system.
//
// For backwards-compatibility, the top-level fields in the spec file
// may be empty. In that case, we need to dig them from the GUCs in the
// cluster.settings field.
let pageserver_connstr = spec
.cluster
.settings
.find("neon.pageserver_connstring")
.pageserver_connstring
.clone()
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
.ok_or("pageserver connstr should be provided")?;
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = spec
.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?;
let timeline_id: TimelineId = spec
.cluster
.settings
.find("neon.timeline_id")
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?;
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
tenant_id
} else {
spec.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?
};
let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
timeline_id
} else {
spec.cluster
.settings
.find("neon.timeline_id")
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?
};

Ok(ParsedSpec {
spec,
Expand Down Expand Up @@ -295,8 +291,8 @@ impl ComputeNode {
update_pg_hba(pgdata_path)?;

match spec.mode {
ComputeMode::Primary | ComputeMode::Static(..) => {}
ComputeMode::Replica => {
ComputeMode::Primary => {}
ComputeMode::Replica | ComputeMode::Static(..) => {
add_standby_signal(pgdata_path)?;
}
}
Expand Down Expand Up @@ -376,7 +372,7 @@ impl ComputeNode {

info!(
"finished configuration of compute for project {}",
spec.cluster.cluster_id
spec.cluster.cluster_id.as_deref().unwrap_or("None")
);

Ok(())
Expand Down Expand Up @@ -434,7 +430,7 @@ impl ComputeNode {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
spec.spec.cluster.cluster_id,
spec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
spec.spec.operation_uuid.as_deref().unwrap_or("None"),
spec.tenant_id,
spec.timeline_id,
Expand Down
44 changes: 42 additions & 2 deletions compute_tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::path::Path;

use anyhow::Result;

use crate::pg_helpers::escape_conf_value;
use crate::pg_helpers::PgOptionsSerialize;
use compute_api::spec::{ComputeMode, ComputeSpec};

Expand Down Expand Up @@ -36,10 +37,44 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
// File::create() destroys the file content if it exists.
let mut file = File::create(path)?;

writeln!(file, "# Managed by compute_ctl: begin")?;
// Write the postgresql.conf content from the spec file as is.
if let Some(conf) = &spec.cluster.postgresql_conf {
writeln!(file, "{}", conf)?;
}

write!(file, "{}", &spec.cluster.settings.as_pg_settings())?;

// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
if let Some(s) = &spec.pageserver_connstring {
writeln!(
file,
"neon.pageserver_connstring='{}'",
escape_conf_value(s)
)?;
}
if !spec.safekeeper_connstrings.is_empty() {
writeln!(
file,
"neon.safekeepers='{}'",
escape_conf_value(&spec.safekeeper_connstrings.join(","))
)?;
}
if let Some(s) = &spec.tenant_id {
writeln!(
file,
"neon.tenant_id='{}'",
escape_conf_value(&s.to_string())
)?;
}
if let Some(s) = &spec.timeline_id {
writeln!(
file,
"neon.timeline_id='{}'",
escape_conf_value(&s.to_string())
)?;
}
hlinnaka marked this conversation as resolved.
Show resolved Hide resolved

match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Static(lsn) => {
Expand All @@ -53,7 +88,12 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
}
}

writeln!(file, "# Managed by compute_ctl: end")?;
// If there are any extra options in the 'settings' field, append those
if spec.cluster.settings.is_some() {
writeln!(file, "# Managed by compute_ctl: begin")?;
write!(file, "{}", spec.cluster.settings.as_pg_settings())?;
writeln!(file, "# Managed by compute_ctl: end")?;
}

Ok(())
}
8 changes: 4 additions & 4 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {

// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(state: Arc<ComputeNode>) {
let addr = SocketAddr::from(([0, 0, 0, 0], 3080));
async fn serve(port: u16, state: Arc<ComputeNode>) {
let addr = SocketAddr::from(([0, 0, 0, 0], port));

let make_service = make_service_fn(move |_conn| {
let state = state.clone();
Expand Down Expand Up @@ -256,10 +256,10 @@ async fn serve(state: Arc<ComputeNode>) {
}

/// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
pub fn launch_http_server(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let state = Arc::clone(state);

Ok(thread::Builder::new()
.name("http-endpoint".into())
.spawn(move || serve(state))?)
.spawn(move || serve(port, state))?)
}
2 changes: 1 addition & 1 deletion compute_tools/src/pg_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn escape_literal(s: &str) -> String {

/// Escape a string so that it can be used in postgresql.conf.
/// Same as escape_literal, currently.
fn escape_conf_value(s: &str) -> String {
pub fn escape_conf_value(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
}

Expand Down
70 changes: 55 additions & 15 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,11 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -

println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(
tenant_id,
name,
tenant_id,
timeline_id,
None,
None,
pg_version,
ComputeMode::Primary,
)?;
Expand Down Expand Up @@ -591,7 +592,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(

table.add_row([
endpoint_id.as_str(),
&endpoint.address.to_string(),
&endpoint.pg_address.to_string(),
&endpoint.timeline_id.to_string(),
branch_name,
lsn_str.as_str(),
Expand Down Expand Up @@ -620,8 +621,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_branch_timeline_id(branch_name, tenant_id)
.ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;

let port: Option<u16> = sub_args.get_one::<u16>("port").copied();

let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
let pg_version = sub_args
.get_one::<u32>("pg-version")
.copied()
Expand All @@ -639,14 +640,38 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};

cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, port, pg_version, mode)?;
cplane.new_endpoint(
&endpoint_id,
tenant_id,
timeline_id,
pg_port,
http_port,
pg_version,
mode,
)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;

// If --safekeepers argument is given, use only the listed safekeeper nodes.
let safekeepers =
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
let mut safekeepers: Vec<NodeId> = Vec::new();
for sk_id in safekeepers_str.split(',').map(str::trim) {
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
})?);
safekeepers.push(sk_id);
}
safekeepers
} else {
env.safekeepers.iter().map(|sk| sk.id).collect()
};

let endpoint = cplane.endpoints.get(endpoint_id.as_str());

let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
Expand All @@ -673,7 +698,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
_ => {}
}
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?;
endpoint.start(&auth_token, safekeepers)?;
} else {
let branch_name = sub_args
.get_one::<String>("branch-name")
Expand Down Expand Up @@ -709,14 +734,15 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");

let ep = cplane.new_endpoint(
tenant_id,
endpoint_id,
tenant_id,
timeline_id,
port,
pg_port,
http_port,
pg_version,
mode,
)?;
ep.start(&auth_token)?;
ep.start(&auth_token, safekeepers)?;
}
}
"stop" => {
Expand Down Expand Up @@ -944,11 +970,22 @@ fn cli() -> Command {
.value_parser(value_parser!(u32))
.default_value(DEFAULT_PG_VERSION);

let port_arg = Arg::new("port")
.long("port")
let pg_port_arg = Arg::new("pg-port")
.long("pg-port")
.required(false)
.value_parser(value_parser!(u16))
.value_name("port");
.value_name("pg-port");

let http_port_arg = Arg::new("http-port")
.long("http-port")
.required(false)
.value_parser(value_parser!(u16))
.value_name("http-port");

let safekeepers_arg = Arg::new("safekeepers")
.long("safekeepers")
.required(false)
.value_name("safekeepers");

let stop_mode_arg = Arg::new("stop-mode")
.short('m')
Expand Down Expand Up @@ -1093,7 +1130,8 @@ fn cli() -> Command {
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone())
.arg(pg_port_arg.clone())
.arg(http_port_arg.clone())
.arg(
Arg::new("config-only")
.help("Don't do basebackup, create endpoint directory with only config files")
Expand All @@ -1109,9 +1147,11 @@ fn cli() -> Command {
.arg(branch_name_arg)
.arg(timeline_id_arg)
.arg(lsn_arg)
.arg(port_arg)
.arg(pg_port_arg)
.arg(http_port_arg)
.arg(pg_version_arg)
.arg(hot_standby_arg)
.arg(safekeepers_arg)
)
.subcommand(
Command::new("stop")
Expand Down
Loading