diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 2f515c9bf1d4..c6cfde1d1a4d 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -59,6 +59,9 @@ fn main() -> Result<()> { let matches = cli().get_matches(); + let http_port = *matches + .get_one::("http-port") + .expect("http-port is required"); let pgdata = matches .get_one::("pgdata") .expect("PGDATA path is required"); @@ -178,7 +181,8 @@ fn main() -> Result<()> { // Launch http service first, so we were able to serve control-plane // requests, while configuration is still in progress. - let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread"); + let _http_handle = + launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); if !spec_set { // No spec provided, hang waiting for it. @@ -286,6 +290,14 @@ fn cli() -> clap::Command { let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"); clap::Command::new("compute_ctl") .version(version) + .arg( + Arg::new("http-port") + .long("http-port") + .value_name("HTTP_PORT") + .default_value("3080") + .value_parser(clap::value_parser!(u16)) + .required(false), + ) .arg( Arg::new("connstr") .short('C') diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index a7746629a858..617b330704d7 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,19 +1,3 @@ -// -// XXX: This starts to be scarry similar to the `PostgresNode` from `control_plane`, -// but there are several things that makes `PostgresNode` usage inconvenient in the -// cloud: -// - it inherits from `LocalEnv`, which contains **all-all** the information about -// a complete service running -// - it uses `PageServerNode` with information about http endpoint, which we do not -// need in the cloud again -// - many tiny pieces like, for example, we do not use `pg_ctl` in the cloud -// -// Thus, to use `PostgresNode` in the cloud, we need to 'mock' a bunch of required -// attributes (not required for the cloud). Yet, it is still tempting to unify these -// `PostgresNode` and `ComputeNode` and use one in both places. -// -// TODO: stabilize `ComputeNode` and think about using it in the `control_plane`. -// use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::Path; @@ -106,26 +90,38 @@ pub struct ParsedSpec { impl TryFrom for ParsedSpec { type Error = String; fn try_from(spec: ComputeSpec) -> Result { + // Extract the options from the spec file that are needed to connect to + // the storage system. + // + // For backwards-compatibility, the top-level fields in the spec file + // may be empty. In that case, we need to dig them from the GUCs in the + // cluster.settings field. let pageserver_connstr = spec - .cluster - .settings - .find("neon.pageserver_connstring") + .pageserver_connstring + .clone() + .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring")) .ok_or("pageserver connstr should be provided")?; let storage_auth_token = spec.storage_auth_token.clone(); - let tenant_id: TenantId = spec - .cluster - .settings - .find("neon.tenant_id") - .ok_or("tenant id should be provided") - .map(|s| TenantId::from_str(&s))? - .or(Err("invalid tenant id"))?; - let timeline_id: TimelineId = spec - .cluster - .settings - .find("neon.timeline_id") - .ok_or("timeline id should be provided") - .map(|s| TimelineId::from_str(&s))? - .or(Err("invalid timeline id"))?; + let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id { + tenant_id + } else { + spec.cluster + .settings + .find("neon.tenant_id") + .ok_or("tenant id should be provided") + .map(|s| TenantId::from_str(&s))? + .or(Err("invalid tenant id"))? + }; + let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id { + timeline_id + } else { + spec.cluster + .settings + .find("neon.timeline_id") + .ok_or("timeline id should be provided") + .map(|s| TimelineId::from_str(&s))? + .or(Err("invalid timeline id"))? + }; Ok(ParsedSpec { spec, @@ -295,8 +291,8 @@ impl ComputeNode { update_pg_hba(pgdata_path)?; match spec.mode { - ComputeMode::Primary | ComputeMode::Static(..) => {} - ComputeMode::Replica => { + ComputeMode::Primary => {} + ComputeMode::Replica | ComputeMode::Static(..) => { add_standby_signal(pgdata_path)?; } } @@ -376,7 +372,7 @@ impl ComputeNode { info!( "finished configuration of compute for project {}", - spec.cluster.cluster_id + spec.cluster.cluster_id.as_deref().unwrap_or("None") ); Ok(()) @@ -434,7 +430,7 @@ impl ComputeNode { let spec = compute_state.pspec.as_ref().expect("spec must be set"); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", - spec.spec.cluster.cluster_id, + spec.spec.cluster.cluster_id.as_deref().unwrap_or("None"), spec.spec.operation_uuid.as_deref().unwrap_or("None"), spec.tenant_id, spec.timeline_id, diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 1168f3876af3..99346433d062 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -5,6 +5,7 @@ use std::path::Path; use anyhow::Result; +use crate::pg_helpers::escape_conf_value; use crate::pg_helpers::PgOptionsSerialize; use compute_api::spec::{ComputeMode, ComputeSpec}; @@ -36,10 +37,44 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { // File::create() destroys the file content if it exists. let mut file = File::create(path)?; - writeln!(file, "# Managed by compute_ctl: begin")?; + // Write the postgresql.conf content from the spec file as is. + if let Some(conf) = &spec.cluster.postgresql_conf { + writeln!(file, "{}", conf)?; + } write!(file, "{}", &spec.cluster.settings.as_pg_settings())?; + // Add options for connecting to storage + writeln!(file, "# Neon storage settings")?; + if let Some(s) = &spec.pageserver_connstring { + writeln!( + file, + "neon.pageserver_connstring='{}'", + escape_conf_value(s) + )?; + } + if !spec.safekeeper_connstrings.is_empty() { + writeln!( + file, + "neon.safekeepers='{}'", + escape_conf_value(&spec.safekeeper_connstrings.join(",")) + )?; + } + if let Some(s) = &spec.tenant_id { + writeln!( + file, + "neon.tenant_id='{}'", + escape_conf_value(&s.to_string()) + )?; + } + if let Some(s) = &spec.timeline_id { + writeln!( + file, + "neon.timeline_id='{}'", + escape_conf_value(&s.to_string()) + )?; + } + match spec.mode { ComputeMode::Primary => {} ComputeMode::Static(lsn) => { @@ -53,7 +88,12 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { } } - writeln!(file, "# Managed by compute_ctl: end")?; + // If there are any extra options in the 'settings' field, append those + if spec.cluster.settings.is_some() { + writeln!(file, "# Managed by compute_ctl: begin")?; + write!(file, "{}", spec.cluster.settings.as_pg_settings())?; + writeln!(file, "# Managed by compute_ctl: end")?; + } Ok(()) } diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 4468f6f5e49a..afd9c2fb5479 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -220,8 +220,8 @@ fn render_json_error(e: &str, status: StatusCode) -> Response { // Main Hyper HTTP server function that runs it and blocks waiting on it forever. #[tokio::main] -async fn serve(state: Arc) { - let addr = SocketAddr::from(([0, 0, 0, 0], 3080)); +async fn serve(port: u16, state: Arc) { + let addr = SocketAddr::from(([0, 0, 0, 0], port)); let make_service = make_service_fn(move |_conn| { let state = state.clone(); @@ -256,10 +256,10 @@ async fn serve(state: Arc) { } /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`. -pub fn launch_http_server(state: &Arc) -> Result> { +pub fn launch_http_server(port: u16, state: &Arc) -> Result> { let state = Arc::clone(state); Ok(thread::Builder::new() .name("http-endpoint".into()) - .spawn(move || serve(state))?) + .spawn(move || serve(port, state))?) } diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index ed00485d5ac3..d5c845e9eaae 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -23,7 +23,7 @@ fn escape_literal(s: &str) -> String { /// Escape a string so that it can be used in postgresql.conf. /// Same as escape_literal, currently. -fn escape_conf_value(s: &str) -> String { +pub fn escape_conf_value(s: &str) -> String { s.replace('\'', "''").replace('\\', "\\\\") } diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 39551642c0b6..52af936d7b7d 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -476,10 +476,11 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - println!("Creating endpoint for imported timeline ..."); cplane.new_endpoint( - tenant_id, name, + tenant_id, timeline_id, None, + None, pg_version, ComputeMode::Primary, )?; @@ -591,7 +592,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( table.add_row([ endpoint_id.as_str(), - &endpoint.address.to_string(), + &endpoint.pg_address.to_string(), &endpoint.timeline_id.to_string(), branch_name, lsn_str.as_str(), @@ -620,8 +621,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .get_branch_timeline_id(branch_name, tenant_id) .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?; - let port: Option = sub_args.get_one::("port").copied(); - + let pg_port: Option = sub_args.get_one::("pg-port").copied(); + let http_port: Option = sub_args.get_one::("http-port").copied(); let pg_version = sub_args .get_one::("pg-version") .copied() @@ -639,14 +640,38 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), }; - cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, port, pg_version, mode)?; + cplane.new_endpoint( + &endpoint_id, + tenant_id, + timeline_id, + pg_port, + http_port, + pg_version, + mode, + )?; } "start" => { - let port: Option = sub_args.get_one::("port").copied(); + let pg_port: Option = sub_args.get_one::("pg-port").copied(); + let http_port: Option = sub_args.get_one::("http-port").copied(); let endpoint_id = sub_args .get_one::("endpoint_id") .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?; + // If --safekeepers argument is given, use only the listed safekeeper nodes. + let safekeepers = + if let Some(safekeepers_str) = sub_args.get_one::("safekeepers") { + let mut safekeepers: Vec = Vec::new(); + for sk_id in safekeepers_str.split(',').map(str::trim) { + let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| { + anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list") + })?); + safekeepers.push(sk_id); + } + safekeepers + } else { + env.safekeepers.iter().map(|sk| sk.id).collect() + }; + let endpoint = cplane.endpoints.get(endpoint_id.as_str()); let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) { @@ -673,7 +698,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( _ => {} } println!("Starting existing endpoint {endpoint_id}..."); - endpoint.start(&auth_token)?; + endpoint.start(&auth_token, safekeepers)?; } else { let branch_name = sub_args .get_one::("branch-name") @@ -709,14 +734,15 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ..."); let ep = cplane.new_endpoint( - tenant_id, endpoint_id, + tenant_id, timeline_id, - port, + pg_port, + http_port, pg_version, mode, )?; - ep.start(&auth_token)?; + ep.start(&auth_token, safekeepers)?; } } "stop" => { @@ -944,11 +970,22 @@ fn cli() -> Command { .value_parser(value_parser!(u32)) .default_value(DEFAULT_PG_VERSION); - let port_arg = Arg::new("port") - .long("port") + let pg_port_arg = Arg::new("pg-port") + .long("pg-port") .required(false) .value_parser(value_parser!(u16)) - .value_name("port"); + .value_name("pg-port"); + + let http_port_arg = Arg::new("http-port") + .long("http-port") + .required(false) + .value_parser(value_parser!(u16)) + .value_name("http-port"); + + let safekeepers_arg = Arg::new("safekeepers") + .long("safekeepers") + .required(false) + .value_name("safekeepers"); let stop_mode_arg = Arg::new("stop-mode") .short('m') @@ -1093,7 +1130,8 @@ fn cli() -> Command { .arg(branch_name_arg.clone()) .arg(tenant_id_arg.clone()) .arg(lsn_arg.clone()) - .arg(port_arg.clone()) + .arg(pg_port_arg.clone()) + .arg(http_port_arg.clone()) .arg( Arg::new("config-only") .help("Don't do basebackup, create endpoint directory with only config files") @@ -1109,9 +1147,11 @@ fn cli() -> Command { .arg(branch_name_arg) .arg(timeline_id_arg) .arg(lsn_arg) - .arg(port_arg) + .arg(pg_port_arg) + .arg(http_port_arg) .arg(pg_version_arg) .arg(hot_standby_arg) + .arg(safekeepers_arg) ) .subcommand( Command::new("stop") diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index 6c0604a0765e..ad19dfa2049a 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -1,3 +1,9 @@ +//! Code to manage the storage broker +//! +//! In the local test environment, the data for each safekeeper is stored in +//! +//! .neon/safekeepers/ +//! use anyhow::Context; use std::path::PathBuf; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index cc5a7a416890..b28315a35deb 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -1,40 +1,71 @@ +//! Code to manage compute endpoints +//! +//! In the local test environment, the data for each endpoint is stored in +//! +//! .neon/endpoints/ +//! +//! Some basic information about the endpoint, like the tenant and timeline IDs, +//! are stored in the `endpoint.json` file. The `endpoint.json` file is created +//! when the endpoint is created, and doesn't change afterwards. +//! +//! The endpoint is managed by the `compute_ctl` binary. When an endpoint is +//! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads +//! the basebackup from the pageserver to initialize the the data directory, and +//! finally launches the PostgreSQL process. It watches the PostgreSQL process +//! until it exits. +//! +//! When an endpoint is created, a `postgresql.conf` file is also created in +//! the endpoint's directory. The file can be modified before starting PostgreSQL. +//! However, the `postgresql.conf` file in the endpoint directory is not used directly +//! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another +//! copy of it in the data directory. +//! +//! Directory contents: +//! +//! ```ignore +//! .neon/endpoints/main/ +//! compute.log - log output of `compute_ctl` and `postgres` +//! endpoint.json - serialized `EndpointConf` struct +//! postgresql.conf - postgresql settings +//! spec.json - passed to `compute_ctl` +//! pgdata/ +//! postgresql.conf - copy of postgresql.conf created by `compute_ctl` +//! zenith.signal +//! +//! ``` +//! use std::collections::BTreeMap; -use std::fs::{self, File}; -use std::io::Write; use std::net::SocketAddr; use std::net::TcpStream; -use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; -use std::process::{Command, Stdio}; -use std::str::FromStr; +use std::process::Command; use std::sync::Arc; use std::time::Duration; -use anyhow::{Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, -}; +use utils::id::{NodeId, TenantId, TimelineId}; use crate::local_env::LocalEnv; use crate::pageserver::PageServerNode; use crate::postgresql_conf::PostgresConf; -use compute_api::spec::ComputeMode; +use compute_api::responses::{ComputeState, ComputeStatus}; +use compute_api::spec::{Cluster, ComputeMode, ComputeSpec}; // contents of a endpoint.json file #[serde_as] #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] pub struct EndpointConf { - name: String, + endpoint_id: String, #[serde_as(as = "DisplayFromStr")] tenant_id: TenantId, #[serde_as(as = "DisplayFromStr")] timeline_id: TimelineId, mode: ComputeMode, - port: u16, + pg_port: u16, + http_port: u16, pg_version: u32, } @@ -57,11 +88,11 @@ impl ComputeControlPlane { let pageserver = Arc::new(PageServerNode::from_env(&env)); let mut endpoints = BTreeMap::default(); - for endpoint_dir in fs::read_dir(env.endpoints_path()) + for endpoint_dir in std::fs::read_dir(env.endpoints_path()) .with_context(|| format!("failed to list {}", env.endpoints_path().display()))? { let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?; - endpoints.insert(ep.name.clone(), Arc::new(ep)); + endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep)); } Ok(ComputeControlPlane { @@ -76,25 +107,28 @@ impl ComputeControlPlane { 1 + self .endpoints .values() - .map(|ep| ep.address.port()) + .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port())) .max() .unwrap_or(self.base_port) } + #[allow(clippy::too_many_arguments)] pub fn new_endpoint( &mut self, + endpoint_id: &str, tenant_id: TenantId, - name: &str, timeline_id: TimelineId, - port: Option, + pg_port: Option, + http_port: Option, pg_version: u32, mode: ComputeMode, ) -> Result> { - let port = port.unwrap_or_else(|| self.get_port()); - + let pg_port = pg_port.unwrap_or_else(|| self.get_port()); + let http_port = http_port.unwrap_or_else(|| self.get_port() + 1); let ep = Arc::new(Endpoint { - name: name.to_owned(), - address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), + endpoint_id: endpoint_id.to_owned(), + pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port), + http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port), env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), timeline_id, @@ -102,21 +136,27 @@ impl ComputeControlPlane { tenant_id, pg_version, }); - ep.create_pgdata()?; + + ep.create_endpoint_dir()?; std::fs::write( ep.endpoint_path().join("endpoint.json"), serde_json::to_string_pretty(&EndpointConf { - name: name.to_string(), + endpoint_id: endpoint_id.to_string(), tenant_id, timeline_id, mode, - port, + http_port, + pg_port, pg_version, })?, )?; - ep.setup_pg_conf()?; + std::fs::write( + ep.endpoint_path().join("postgresql.conf"), + ep.setup_pg_conf()?.to_string(), + )?; - self.endpoints.insert(ep.name.clone(), Arc::clone(&ep)); + self.endpoints + .insert(ep.endpoint_id.clone(), Arc::clone(&ep)); Ok(ep) } @@ -127,13 +167,15 @@ impl ComputeControlPlane { #[derive(Debug)] pub struct Endpoint { /// used as the directory name - name: String, + endpoint_id: String, pub tenant_id: TenantId, pub timeline_id: TimelineId, pub mode: ComputeMode, - // port and address of the Postgres server - pub address: SocketAddr, + // port and address of the Postgres server and `compute_ctl`'s HTTP API + pub pg_address: SocketAddr, + pub http_address: SocketAddr, + // postgres major version in the format: 14, 15, etc. pg_version: u32, @@ -158,16 +200,16 @@ impl Endpoint { // parse data directory name let fname = entry.file_name(); - let name = fname.to_str().unwrap().to_string(); + let endpoint_id = fname.to_str().unwrap().to_string(); // Read the endpoint.json file let conf: EndpointConf = serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?; - // ok now Ok(Endpoint { - address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port), - name, + pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port), + http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port), + endpoint_id, env: env.clone(), pageserver: Arc::clone(pageserver), timeline_id: conf.timeline_id, @@ -177,104 +219,17 @@ impl Endpoint { }) } - fn sync_safekeepers(&self, auth_token: &Option, pg_version: u32) -> Result { - let pg_path = self.env.pg_bin_dir(pg_version)?.join("postgres"); - let mut cmd = Command::new(pg_path); - - cmd.arg("--sync-safekeepers") - .env_clear() - .env( - "LD_LIBRARY_PATH", - self.env.pg_lib_dir(pg_version)?.to_str().unwrap(), - ) - .env( - "DYLD_LIBRARY_PATH", - self.env.pg_lib_dir(pg_version)?.to_str().unwrap(), - ) - .env("PGDATA", self.pgdata().to_str().unwrap()) - .stdout(Stdio::piped()) - // Comment this to avoid capturing stderr (useful if command hangs) - .stderr(Stdio::piped()); - - if let Some(token) = auth_token { - cmd.env("NEON_AUTH_TOKEN", token); - } - - let sync_handle = cmd - .spawn() - .expect("postgres --sync-safekeepers failed to start"); - - let sync_output = sync_handle - .wait_with_output() - .expect("postgres --sync-safekeepers failed"); - if !sync_output.status.success() { - anyhow::bail!( - "sync-safekeepers failed: '{}'", - String::from_utf8_lossy(&sync_output.stderr) - ); - } - - let lsn = Lsn::from_str(std::str::from_utf8(&sync_output.stdout)?.trim())?; - println!("Safekeepers synced on {}", lsn); - Ok(lsn) - } - - /// Get basebackup from the pageserver as a tar archive and extract it - /// to the `self.pgdata()` directory. - fn do_basebackup(&self, lsn: Option) -> Result<()> { - println!( - "Extracting base backup to create postgres instance: path={} port={}", - self.pgdata().display(), - self.address.port() - ); - - let sql = if let Some(lsn) = lsn { - format!("basebackup {} {} {}", self.tenant_id, self.timeline_id, lsn) - } else { - format!("basebackup {} {}", self.tenant_id, self.timeline_id) - }; - - let mut client = self - .pageserver - .page_server_psql_client() - .context("connecting to page server failed")?; - - let copyreader = client - .copy_out(sql.as_str()) - .context("page server 'basebackup' command failed")?; - - // Read the archive directly from the `CopyOutReader` - // - // Set `ignore_zeros` so that unpack() reads all the Copy data and - // doesn't stop at the end-of-archive marker. Otherwise, if the server - // sends an Error after finishing the tarball, we will not notice it. - let mut ar = tar::Archive::new(copyreader); - ar.set_ignore_zeros(true); - ar.unpack(&self.pgdata()) - .context("extracting base backup failed")?; - - Ok(()) - } - - fn create_pgdata(&self) -> Result<()> { - fs::create_dir_all(self.pgdata()).with_context(|| { + fn create_endpoint_dir(&self) -> Result<()> { + std::fs::create_dir_all(self.endpoint_path()).with_context(|| { format!( - "could not create data directory {}", - self.pgdata().display() + "could not create endpoint directory {}", + self.endpoint_path().display() ) - })?; - fs::set_permissions(self.pgdata().as_path(), fs::Permissions::from_mode(0o700)) - .with_context(|| { - format!( - "could not set permissions in data directory {}", - self.pgdata().display() - ) - }) + }) } - // Write postgresql.conf with default configuration - // and PG_VERSION file to the data directory of a new endpoint. - fn setup_pg_conf(&self) -> Result<()> { + // Generate postgresql.conf with default configuration + fn setup_pg_conf(&self) -> Result { let mut conf = PostgresConf::new(); conf.append("max_wal_senders", "10"); conf.append("wal_log_hints", "off"); @@ -287,25 +242,14 @@ impl Endpoint { // wal_sender_timeout is the maximum time to wait for WAL replication. // It also defines how often the walreciever will send a feedback message to the wal sender. conf.append("wal_sender_timeout", "5s"); - conf.append("listen_addresses", &self.address.ip().to_string()); - conf.append("port", &self.address.port().to_string()); + conf.append("listen_addresses", &self.pg_address.ip().to_string()); + conf.append("port", &self.pg_address.port().to_string()); conf.append("wal_keep_size", "0"); // walproposer panics when basebackup is invalid, it is pointless to restart in this case. conf.append("restart_after_crash", "off"); - // Configure the Neon Postgres extension to fetch pages from pageserver - let pageserver_connstr = { - let config = &self.pageserver.pg_connection_config; - let (host, port) = (config.host(), config.port()); - - // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere. - format!("postgresql://no_user@{host}:{port}") - }; + // Load the 'neon' extension conf.append("shared_preload_libraries", "neon"); - conf.append_line(""); - conf.append("neon.pageserver_connstring", &pageserver_connstr); - conf.append("neon.tenant_id", &self.tenant_id.to_string()); - conf.append("neon.timeline_id", &self.timeline_id.to_string()); conf.append_line(""); // Replication-related configurations, such as WAL sending @@ -390,46 +334,11 @@ impl Endpoint { } } - let mut file = File::create(self.pgdata().join("postgresql.conf"))?; - file.write_all(conf.to_string().as_bytes())?; - - let mut file = File::create(self.pgdata().join("PG_VERSION"))?; - file.write_all(self.pg_version.to_string().as_bytes())?; - - Ok(()) - } - - fn load_basebackup(&self, auth_token: &Option) -> Result<()> { - let backup_lsn = match &self.mode { - ComputeMode::Primary => { - if !self.env.safekeepers.is_empty() { - // LSN 0 means that it is bootstrap and we need to download just - // latest data from the pageserver. That is a bit clumsy but whole bootstrap - // procedure evolves quite actively right now, so let's think about it again - // when things would be more stable (TODO). - let lsn = self.sync_safekeepers(auth_token, self.pg_version)?; - if lsn == Lsn(0) { - None - } else { - Some(lsn) - } - } else { - None - } - } - ComputeMode::Static(lsn) => Some(*lsn), - ComputeMode::Replica => { - None // Take the latest snapshot available to start with - } - }; - - self.do_basebackup(backup_lsn)?; - - Ok(()) + Ok(conf) } pub fn endpoint_path(&self) -> PathBuf { - self.env.endpoints_path().join(&self.name) + self.env.endpoints_path().join(&self.endpoint_id) } pub fn pgdata(&self) -> PathBuf { @@ -439,7 +348,7 @@ impl Endpoint { pub fn status(&self) -> &str { let timeout = Duration::from_millis(300); let has_pidfile = self.pgdata().join("postmaster.pid").exists(); - let can_connect = TcpStream::connect_timeout(&self.address, timeout).is_ok(); + let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok(); match (has_pidfile, can_connect) { (true, true) => "running", @@ -457,8 +366,6 @@ impl Endpoint { &[ "-D", self.pgdata().to_str().unwrap(), - "-l", - self.pgdata().join("pg.log").to_str().unwrap(), "-w", //wait till pg_ctl actually does what was asked ], args, @@ -494,36 +401,183 @@ impl Endpoint { Ok(()) } - pub fn start(&self, auth_token: &Option) -> Result<()> { + pub fn start(&self, auth_token: &Option, safekeepers: Vec) -> Result<()> { if self.status() == "running" { anyhow::bail!("The endpoint is already running"); } - // 1. We always start Postgres from scratch, so - // if old dir exists, preserve 'postgresql.conf' and drop the directory - let postgresql_conf_path = self.pgdata().join("postgresql.conf"); - let postgresql_conf = fs::read(&postgresql_conf_path).with_context(|| { - format!( - "failed to read config file in {}", - postgresql_conf_path.to_str().unwrap() - ) - })?; - fs::remove_dir_all(self.pgdata())?; - self.create_pgdata()?; + // Slurp the endpoints//postgresql.conf file into + // memory. We will include it in the spec file that we pass to + // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf + // in the data directory. + let postgresql_conf_path = self.endpoint_path().join("postgresql.conf"); + let postgresql_conf = match std::fs::read(&postgresql_conf_path) { + Ok(content) => String::from_utf8(content)?, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => "".to_string(), + Err(e) => { + return Err(anyhow::Error::new(e).context(format!( + "failed to read config file in {}", + postgresql_conf_path.to_str().unwrap() + ))) + } + }; - // 2. Bring back config files - fs::write(&postgresql_conf_path, postgresql_conf)?; + // We always start the compute node from scratch, so if the Postgres + // data dir exists from a previous launch, remove it first. + if self.pgdata().exists() { + std::fs::remove_dir_all(self.pgdata())?; + } - // 3. Load basebackup - self.load_basebackup(auth_token)?; + let pageserver_connstring = { + let config = &self.pageserver.pg_connection_config; + let (host, port) = (config.host(), config.port()); + + // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere. + format!("postgresql://no_user@{host}:{port}") + }; + let mut safekeeper_connstrings = Vec::new(); + if self.mode == ComputeMode::Primary { + for sk_id in safekeepers { + let sk = self + .env + .safekeepers + .iter() + .find(|node| node.id == sk_id) + .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?; + safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.pg_port)); + } + } - if self.mode != ComputeMode::Primary { - File::create(self.pgdata().join("standby.signal"))?; + // Create spec file + let spec = ComputeSpec { + format_version: 1.0, + operation_uuid: None, + cluster: Cluster { + cluster_id: None, // project ID: not used + name: None, // project name: not used + state: None, + roles: vec![], + databases: vec![], + settings: None, + postgresql_conf: Some(postgresql_conf), + }, + delta_operations: None, + tenant_id: Some(self.tenant_id), + timeline_id: Some(self.timeline_id), + mode: self.mode, + pageserver_connstring: Some(pageserver_connstring), + safekeeper_connstrings, + storage_auth_token: auth_token.clone(), + }; + let spec_path = self.endpoint_path().join("spec.json"); + std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; + + // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it. + let logfile = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(self.endpoint_path().join("compute.log"))?; + + // Launch compute_ctl + println!("Starting postgres node at '{}'", self.connstr()); + let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl")); + cmd.args(["--http-port", &self.http_address.port().to_string()]) + .args(["--pgdata", self.pgdata().to_str().unwrap()]) + .args(["--connstr", &self.connstr()]) + .args([ + "--spec-path", + self.endpoint_path().join("spec.json").to_str().unwrap(), + ]) + .args([ + "--pgbin", + self.env + .pg_bin_dir(self.pg_version)? + .join("postgres") + .to_str() + .unwrap(), + ]) + .stdin(std::process::Stdio::null()) + .stderr(logfile.try_clone()?) + .stdout(logfile); + let _child = cmd.spawn()?; + + // Wait for it to start + let mut attempt = 0; + const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100); + const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s + loop { + attempt += 1; + match self.get_status() { + Ok(state) => { + match state.status { + ComputeStatus::Init => { + if attempt == MAX_ATTEMPTS { + bail!("compute startup timed out; still in Init state"); + } + // keep retrying + } + ComputeStatus::Running => { + // All good! + break; + } + ComputeStatus::Failed => { + bail!( + "compute startup failed: {}", + state + .error + .as_deref() + .unwrap_or("") + ); + } + ComputeStatus::Empty + | ComputeStatus::ConfigurationPending + | ComputeStatus::Configuration => { + bail!("unexpected compute status: {:?}", state.status) + } + } + } + Err(e) => { + if attempt == MAX_ATTEMPTS { + return Err(e).context( + "timed out waiting to connect to compute_ctl HTTP; last error: {e}", + ); + } + } + } + std::thread::sleep(ATTEMPT_INTERVAL); } - // 4. Finally start postgres - println!("Starting postgres at '{}'", self.connstr()); - self.pg_ctl(&["start"], auth_token) + Ok(()) + } + + // Call the /status HTTP API + pub fn get_status(&self) -> Result { + let client = reqwest::blocking::Client::new(); + + let response = client + .request( + reqwest::Method::GET, + format!( + "http://{}:{}/status", + self.http_address.ip(), + self.http_address.port() + ), + ) + .send()?; + + // Interpret the response + let status = response.status(); + if !(status.is_client_error() || status.is_server_error()) { + Ok(response.json()?) + } else { + // reqwest does not export its error construction utility functions, so let's craft the message ourselves + let url = response.url().to_owned(); + let msg = match response.text() { + Ok(err_body) => format!("Error: {}", err_body), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }; + Err(anyhow::anyhow!(msg)) + } } pub fn stop(&self, destroy: bool) -> Result<()> { @@ -540,7 +594,7 @@ impl Endpoint { "Destroying postgres data directory '{}'", self.pgdata().to_str().unwrap() ); - fs::remove_dir_all(self.endpoint_path())?; + std::fs::remove_dir_all(self.endpoint_path())?; } else { self.pg_ctl(&["stop"], &None)?; } @@ -549,10 +603,10 @@ impl Endpoint { pub fn connstr(&self) -> String { format!( - "host={} port={} user={} dbname={}", - self.address.ip(), - self.address.port(), + "postgresql://{}@{}:{}/{}", "cloud_admin", + self.pg_address.ip(), + self.pg_address.port(), "postgres" ) } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 9286944412dd..df70cb313928 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -37,7 +37,7 @@ pub const DEFAULT_PG_VERSION: u32 = 15; #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] pub struct LocalEnv { // Base directory for all the nodes (the pageserver, safekeepers and - // compute nodes). + // compute endpoints). // // This is not stored in the config file. Rather, this is the path where the // config file itself is. It is read from the NEON_REPO_DIR env variable or diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 400df60f0e50..2ff09021e536 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -1,3 +1,9 @@ +//! Code to manage pageservers +//! +//! In the local test environment, the pageserver stores its data directly in +//! +//! .neon/ +//! use std::borrow::Cow; use std::collections::HashMap; use std::fs::File; diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index d358f7334325..9e053ff1f19f 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -1,3 +1,9 @@ +//! Code to manage safekeepers +//! +//! In the local test environment, the data for each safekeeper is stored in +//! +//! .neon/safekeepers/ +//! use std::io::Write; use std::path::PathBuf; use std::process::Child; diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index d181c018b1be..ce73dda08ad8 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -5,13 +5,13 @@ use serde::{Deserialize, Serialize, Serializer}; use crate::spec::ComputeSpec; -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Deserialize)] pub struct GenericAPIError { pub error: String, } /// Response of the /status API -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Deserialize)] #[serde(rename_all = "snake_case")] pub struct ComputeStatusResponse { pub start_time: DateTime, @@ -23,7 +23,7 @@ pub struct ComputeStatusResponse { pub error: Option, } -#[derive(Serialize)] +#[derive(Deserialize, Serialize)] #[serde(rename_all = "snake_case")] pub struct ComputeState { pub status: ComputeStatus, @@ -33,7 +33,7 @@ pub struct ComputeState { pub error: Option, } -#[derive(Serialize, Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ComputeStatus { // Spec wasn't provided at start, waiting for it to be diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 6072980ed8bb..4014774a7ed0 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -5,6 +5,7 @@ //! and connect it to the storage nodes. use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; +use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; /// String type alias representing Postgres identifier and @@ -14,7 +15,7 @@ pub type PgIdent = String; /// Cluster spec or configuration represented as an optional number of /// delta operations + final cluster state description. #[serde_as] -#[derive(Clone, Debug, Default, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct ComputeSpec { pub format_version: f32, @@ -26,9 +27,32 @@ pub struct ComputeSpec { pub cluster: Cluster, pub delta_operations: Option>, + // Information needed to connect to the storage layer. + // + // `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed. + // + // Depending on `mode`, this can be a primary read-write node, a read-only + // replica, or a read-only node pinned at an older LSN. + // `safekeeper_connstrings` must be set for a primary. + // + // For backwards compatibility, the control plane may leave out all of + // these, and instead set the "neon.tenant_id", "neon.timeline_id", + // etc. GUCs in cluster.settings. TODO: Once the control plane has been + // updated to fill these fields, we can make these non optional. + #[serde_as(as = "Option")] + pub tenant_id: Option, + #[serde_as(as = "Option")] + pub timeline_id: Option, + #[serde_as(as = "Option")] + pub pageserver_connstring: Option, + #[serde(default)] + pub safekeeper_connstrings: Vec, + #[serde(default)] pub mode: ComputeMode, + /// If set, 'storage_auth_token' is used as the password to authenticate to + /// the pageserver and safekeepers. pub storage_auth_token: Option, } @@ -47,13 +71,19 @@ pub enum ComputeMode { Replica, } -#[derive(Clone, Debug, Default, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Cluster { - pub cluster_id: String, - pub name: String, + pub cluster_id: Option, + pub name: Option, pub state: Option, pub roles: Vec, pub databases: Vec, + + /// Desired contents of 'postgresql.conf' file. (The 'compute_ctl' + /// tool may add additional settings to the final file.) + pub postgresql_conf: Option, + + /// Additional settings that will be appended to the 'postgresql.conf' file. pub settings: GenericOptions, } @@ -63,7 +93,7 @@ pub struct Cluster { /// - DROP ROLE /// - ALTER ROLE name RENAME TO new_name /// - ALTER DATABASE name RENAME TO new_name -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct DeltaOp { pub action: String, pub name: PgIdent, @@ -72,7 +102,7 @@ pub struct DeltaOp { /// Rust representation of Postgres role info with only those fields /// that matter for us. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct Role { pub name: PgIdent, pub encrypted_password: Option, @@ -81,7 +111,7 @@ pub struct Role { /// Rust representation of Postgres database info with only those fields /// that matter for us. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct Database { pub name: PgIdent, pub owner: PgIdent, @@ -91,7 +121,7 @@ pub struct Database { /// Common type representing both SQL statement params with or without value, /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config /// options like `wal_level = logical`. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct GenericOption { pub name: String, pub value: Option, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a810c367d808..551faa116e1a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1446,11 +1446,12 @@ def safekeeper_stop( def endpoint_create( self, branch_name: str, + pg_port: int, + http_port: int, endpoint_id: Optional[str] = None, tenant_id: Optional[TenantId] = None, hot_standby: bool = False, lsn: Optional[Lsn] = None, - port: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", @@ -1464,8 +1465,10 @@ def endpoint_create( ] if lsn is not None: args.extend(["--lsn", str(lsn)]) - if port is not None: - args.extend(["--port", str(port)]) + if pg_port is not None: + args.extend(["--pg-port", str(pg_port)]) + if http_port is not None: + args.extend(["--http-port", str(http_port)]) if endpoint_id is not None: args.append(endpoint_id) if hot_standby: @@ -1478,9 +1481,11 @@ def endpoint_create( def endpoint_start( self, endpoint_id: str, + pg_port: int, + http_port: int, + safekeepers: Optional[List[int]] = None, tenant_id: Optional[TenantId] = None, lsn: Optional[Lsn] = None, - port: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", @@ -1492,8 +1497,10 @@ def endpoint_start( ] if lsn is not None: args.append(f"--lsn={lsn}") - if port is not None: - args.append(f"--port={port}") + args.extend(["--pg-port", str(pg_port)]) + args.extend(["--http-port", str(http_port)]) + if safekeepers is not None: + args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) if endpoint_id is not None: args.append(endpoint_id) @@ -2284,17 +2291,24 @@ class Endpoint(PgProtocol): """An object representing a Postgres compute endpoint managed by the control plane.""" def __init__( - self, env: NeonEnv, tenant_id: TenantId, port: int, check_stop_result: bool = True + self, + env: NeonEnv, + tenant_id: TenantId, + pg_port: int, + http_port: int, + check_stop_result: bool = True, ): - super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres") + super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres") self.env = env self.running = False self.branch_name: Optional[str] = None # dubious self.endpoint_id: Optional[str] = None # dubious, see asserts below self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA self.tenant_id = tenant_id - self.port = port + self.pg_port = pg_port + self.http_port = http_port self.check_stop_result = check_stop_result + self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers)) # path to conf is /endpoints//pgdata/postgresql.conf def create( @@ -2324,7 +2338,8 @@ def create( tenant_id=self.tenant_id, lsn=lsn, hot_standby=hot_standby, - port=self.port, + pg_port=self.pg_port, + http_port=self.http_port, ) path = Path("endpoints") / self.endpoint_id / "pgdata" self.pgdata_dir = os.path.join(self.env.repo_dir, path) @@ -2349,7 +2364,13 @@ def start(self) -> "Endpoint": log.info(f"Starting postgres endpoint {self.endpoint_id}") - self.env.neon_cli.endpoint_start(self.endpoint_id, tenant_id=self.tenant_id, port=self.port) + self.env.neon_cli.endpoint_start( + self.endpoint_id, + pg_port=self.pg_port, + http_port=self.http_port, + tenant_id=self.tenant_id, + safekeepers=self.active_safekeepers, + ) self.running = True return self @@ -2373,32 +2394,8 @@ def pg_twophase_dir_path(self) -> str: return os.path.join(self.pg_data_dir_path(), "pg_twophase") def config_file_path(self) -> str: - """Path to postgresql.conf""" - return os.path.join(self.pg_data_dir_path(), "postgresql.conf") - - def adjust_for_safekeepers(self, safekeepers: str) -> "Endpoint": - """ - Adjust instance config for working with wal acceptors instead of - pageserver (pre-configured by CLI) directly. - """ - - # TODO: reuse config() - with open(self.config_file_path(), "r") as f: - cfg_lines = f.readlines() - with open(self.config_file_path(), "w") as f: - for cfg_line in cfg_lines: - # walproposer uses different application_name - if ( - "synchronous_standby_names" in cfg_line - or - # don't repeat safekeepers/wal_acceptors multiple times - "neon.safekeepers" in cfg_line - ): - continue - f.write(cfg_line) - f.write("synchronous_standby_names = 'walproposer'\n") - f.write("neon.safekeepers = '{}'\n".format(safekeepers)) - return self + """Path to the postgresql.conf in the endpoint directory (not the one in pgdata)""" + return os.path.join(self.endpoint_path(), "postgresql.conf") def config(self, lines: List[str]) -> "Endpoint": """ @@ -2503,7 +2500,8 @@ def create_start( ep = Endpoint( self.env, tenant_id=tenant_id or self.env.initial_tenant, - port=self.env.port_distributor.get_port(), + pg_port=self.env.port_distributor.get_port(), + http_port=self.env.port_distributor.get_port(), ) self.num_instances += 1 self.endpoints.append(ep) @@ -2528,7 +2526,8 @@ def create( ep = Endpoint( self.env, tenant_id=tenant_id or self.env.initial_tenant, - port=self.env.port_distributor.get_port(), + pg_port=self.env.port_distributor.get_port(), + http_port=self.env.port_distributor.get_port(), ) if endpoint_id is None: @@ -2911,6 +2910,7 @@ def test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Iterator[P "pg_internal.init", "pg.log", "zenith.signal", + "pg_hba.conf", "postgresql.conf", "postmaster.opts", "postmaster.pid", diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index fe8dc293c1f2..2635dbd93c3b 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -383,6 +383,9 @@ def check_neon_works( cli_target = NeonCli(config_target) # And the current binaries to launch computes + snapshot_config["neon_distrib_dir"] = str(neon_current_binpath) + with (snapshot_config_toml).open("w") as f: + toml.dump(snapshot_config, f) config_current = copy.copy(config) config_current.neon_binpath = neon_current_binpath cli_current = NeonCli(config_current) @@ -391,7 +394,8 @@ def check_neon_works( request.addfinalizer(lambda: cli_target.raw_cli(["stop"])) pg_port = port_distributor.get_port() - cli_current.endpoint_start("main", port=pg_port) + http_port = port_distributor.get_port() + cli_current.endpoint_start("main", pg_port=pg_port, http_port=http_port) request.addfinalizer(lambda: cli_current.endpoint_stop("main")) connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres" diff --git a/test_runner/regress/test_compute_ctl.py b/test_runner/regress/test_compute_ctl.py deleted file mode 100644 index d72ffe078d83..000000000000 --- a/test_runner/regress/test_compute_ctl.py +++ /dev/null @@ -1,253 +0,0 @@ -import os -from pathlib import Path -from subprocess import TimeoutExpired - -from fixtures.log_helper import log -from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin - - -# Test that compute_ctl works and prints "--sync-safekeepers" logs. -def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): - neon_env_builder.num_safekeepers = 3 - env = neon_env_builder.init_start() - ctl = ComputeCtl(env) - - env.neon_cli.create_branch("test_compute_ctl", "main") - endpoint = env.endpoints.create_start("test_compute_ctl") - endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)") - - with open(endpoint.config_file_path(), "r") as f: - cfg_lines = f.readlines() - cfg_map = {} - for line in cfg_lines: - if "=" in line: - k, v = line.split("=") - cfg_map[k] = v.strip("\n '\"") - log.info(f"postgres config: {cfg_map}") - pgdata = endpoint.pg_data_dir_path() - pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres") - - endpoint.stop_and_destroy() - - # stop_and_destroy removes the whole endpoint directory. Recreate it. - Path(pgdata).mkdir(parents=True) - - spec = ( - """ -{ - "format_version": 1.0, - - "timestamp": "2021-05-23T18:25:43.511Z", - "operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b", - - "cluster": { - "cluster_id": "test-cluster-42", - "name": "Neon Test", - "state": "restarted", - "roles": [ - ], - "databases": [ - ], - "settings": [ - { - "name": "fsync", - "value": "off", - "vartype": "bool" - }, - { - "name": "wal_level", - "value": "replica", - "vartype": "enum" - }, - { - "name": "neon.safekeepers", - "value": """ - + f'"{cfg_map["neon.safekeepers"]}"' - + """, - "vartype": "string" - }, - { - "name": "wal_log_hints", - "value": "on", - "vartype": "bool" - }, - { - "name": "log_connections", - "value": "on", - "vartype": "bool" - }, - { - "name": "shared_buffers", - "value": "32768", - "vartype": "integer" - }, - { - "name": "port", - "value": """ - + f'"{cfg_map["port"]}"' - + """, - "vartype": "integer" - }, - { - "name": "max_connections", - "value": "100", - "vartype": "integer" - }, - { - "name": "max_wal_senders", - "value": "10", - "vartype": "integer" - }, - { - "name": "listen_addresses", - "value": "0.0.0.0", - "vartype": "string" - }, - { - "name": "wal_sender_timeout", - "value": "0", - "vartype": "integer" - }, - { - "name": "password_encryption", - "value": "md5", - "vartype": "enum" - }, - { - "name": "maintenance_work_mem", - "value": "65536", - "vartype": "integer" - }, - { - "name": "max_parallel_workers", - "value": "8", - "vartype": "integer" - }, - { - "name": "max_worker_processes", - "value": "8", - "vartype": "integer" - }, - { - "name": "neon.tenant_id", - "value": """ - + f'"{cfg_map["neon.tenant_id"]}"' - + """, - "vartype": "string" - }, - { - "name": "max_replication_slots", - "value": "10", - "vartype": "integer" - }, - { - "name": "neon.timeline_id", - "value": """ - + f'"{cfg_map["neon.timeline_id"]}"' - + """, - "vartype": "string" - }, - { - "name": "shared_preload_libraries", - "value": "neon", - "vartype": "string" - }, - { - "name": "synchronous_standby_names", - "value": "walproposer", - "vartype": "string" - }, - { - "name": "neon.pageserver_connstring", - "value": """ - + f'"{cfg_map["neon.pageserver_connstring"]}"' - + """, - "vartype": "string" - } - ] - }, - "delta_operations": [ - ] -} -""" - ) - - ps_connstr = cfg_map["neon.pageserver_connstring"] - log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}") - - # run compute_ctl and wait for 10s - try: - ctl.raw_cli( - [ - "--connstr", - "postgres://invalid/", - "--pgdata", - pgdata, - "--spec", - spec, - "--pgbin", - pg_bin_path, - ], - timeout=10, - ) - except TimeoutExpired as exc: - ctl_logs = (exc.stderr or b"").decode("utf-8") - log.info(f"compute_ctl stderr:\n{ctl_logs}") - - with ExternalProcessManager(Path(pgdata) / "postmaster.pid"): - start = "starting safekeepers syncing" - end = "safekeepers synced at LSN" - start_pos = ctl_logs.index(start) - assert start_pos != -1 - end_pos = ctl_logs.index(end, start_pos) - assert end_pos != -1 - sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)] - log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs) - - # assert that --sync-safekeepers logs are present in the output - assert "connecting with node" in sync_safekeepers_logs - assert "connected with node" in sync_safekeepers_logs - assert "proposer connected to quorum (2)" in sync_safekeepers_logs - assert "got votes from majority (2)" in sync_safekeepers_logs - assert "sending elected msg to node" in sync_safekeepers_logs - - -class ExternalProcessManager: - """ - Context manager that kills a process with a pid file on exit. - """ - - def __init__(self, pid_file: Path): - self.path = pid_file - self.pid_file = open(pid_file, "r") - self.pid = int(self.pid_file.readline().strip()) - - def __enter__(self): - return self - - def leave_alive(self): - self.pid_file.close() - - def __exit__(self, _type, _value, _traceback): - import signal - import time - - if self.pid_file.closed: - return - - with self.pid_file: - try: - os.kill(self.pid, signal.SIGTERM) - except OSError as e: - if not self.path.is_file(): - return - log.info(f"Failed to kill {self.pid}, but the pidfile remains: {e}") - return - - for _ in range(20): - if not self.path.is_file(): - return - time.sleep(0.2) - - log.info("Process failed to stop after SIGTERM: {self.pid}") - os.kill(self.pid, signal.SIGKILL) diff --git a/test_runner/regress/test_neon_local_cli.py b/test_runner/regress/test_neon_local_cli.py index f6629c54f9b9..3314e7fbf65e 100644 --- a/test_runner/regress/test_neon_local_cli.py +++ b/test_runner/regress/test_neon_local_cli.py @@ -9,11 +9,18 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por try: env.neon_cli.start() env.neon_cli.create_tenant(tenant_id=env.initial_tenant, set_default=True) - env.neon_cli.endpoint_start(endpoint_id="ep-main", port=port_distributor.get_port()) + + pg_port = port_distributor.get_port() + http_port = port_distributor.get_port() + env.neon_cli.endpoint_start( + endpoint_id="ep-basic-main", pg_port=pg_port, http_port=http_port + ) env.neon_cli.create_branch(new_branch_name="migration_check") + pg_port = port_distributor.get_port() + http_port = port_distributor.get_port() env.neon_cli.endpoint_start( - endpoint_id="ep-migration_check", port=port_distributor.get_port() + endpoint_id="ep-migration_check", pg_port=pg_port, http_port=http_port ) finally: env.neon_cli.stop() diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index f5e0e34bc9a4..9d0fdcfaf8eb 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -59,6 +59,13 @@ def test_tenant_reattach( # create new nenant tenant_id, timeline_id = env.neon_cli.create_tenant() + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: with endpoint.cursor() as cur: cur.execute("CREATE TABLE t(key int primary key, value text)") @@ -223,13 +230,6 @@ def test_tenant_reattach_while_busy( ) env = neon_env_builder.init_start() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(".*Tenant .* not found.*") - env.pageserver.allowed_errors.append( - ".*Tenant .* will not become active\\. Current state: Stopping.*" - ) - pageserver_http = env.pageserver.http_client() # create new nenant @@ -238,6 +238,13 @@ def test_tenant_reattach_while_busy( conf={"checkpoint_distance": "100000"} ) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) cur = endpoint.connect().cursor() @@ -275,6 +282,13 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): # create new nenant tenant_id, timeline_id = env.neon_cli.create_tenant() + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + # assert tenant exists on disk assert (env.repo_dir / "tenants" / str(tenant_id)).exists() @@ -336,6 +350,13 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv): # create a new tenant tenant_id, _ = env.neon_cli.create_tenant() + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + # assert tenant exists on disk assert (env.repo_dir / "tenants" / str(tenant_id)).exists() @@ -385,6 +406,13 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv): # create a new tenant tenant_id, _ = env.neon_cli.create_tenant() + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + # assert tenant exists on disk assert (env.repo_dir / "tenants" / str(tenant_id)).exists() @@ -399,6 +427,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv): log.info("detaching regular tenant with detach ignored flag") client.tenant_detach(tenant_id, True) + log.info("regular tenant detached without error") # check that nothing is left on disk for deleted tenant @@ -432,6 +461,13 @@ def test_detach_while_attaching( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + # Create table, and insert some rows. Make it big enough that it doesn't fit in # shared_buffers, otherwise the SELECT after restart will just return answer # from shared_buffers without hitting the page server, which defeats the point @@ -577,6 +613,13 @@ def test_ignored_tenant_download_missing_layers( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + data_id = 1 data_secret = "very secret secret" insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint) @@ -636,6 +679,13 @@ def test_ignored_tenant_stays_broken_without_metadata( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Broken.*" + ) + # ignore the tenant and remove its metadata pageserver_http.tenant_ignore(tenant_id) tenant_timeline_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) @@ -672,6 +722,13 @@ def test_load_attach_negatives( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*") with pytest.raises( expected_exception=PageserverApiException, @@ -714,6 +771,13 @@ def test_ignore_while_attaching( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + data_id = 1 data_secret = "very secret secret" insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint) diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index 60ab268882c0..e9dcd1e5cdee 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -318,7 +318,7 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa def test_single_branch_get_tenant_size_grows( - neon_env_builder: NeonEnvBuilder, test_output_dir: Path + neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_version: PgVersion ): """ Operate on single branch reading the tenants size after each transaction. @@ -333,6 +333,13 @@ def test_single_branch_get_tenant_size_grows( # that there next_gc_cutoff could be smaller than initdb_lsn, which will # obviously lead to issues when calculating the size. gc_horizon = 0x38000 + + # it's a bit of a hack, but different versions of postgres have different + # amount of WAL generated for the same amount of data. so we need to + # adjust the gc_horizon accordingly. + if pg_version == PgVersion.V14: + gc_horizon = 0x40000 + neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}" env = neon_env_builder.init_start() diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 15712b9e5506..aef2df49321c 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -267,6 +267,7 @@ def test_pageserver_metrics_removed_after_detach( cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") cur.execute("SELECT sum(key) FROM t") assert cur.fetchone() == (5000050000,) + endpoint.stop() def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]: ps_metrics = env.pageserver.http_client().get_metrics() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 2a4141ed30be..8b595596cb94 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1001,9 +1001,6 @@ def test_safekeeper_without_pageserver( def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder): - def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str: - return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names]) - def execute_payload(endpoint: Endpoint): with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -1032,9 +1029,8 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i log.info("Use only first 3 safekeepers") env.safekeepers[3].stop() - active_safekeepers = [1, 2, 3] endpoint = env.endpoints.create("test_replace_safekeeper") - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.active_safekeepers = [1, 2, 3] endpoint.start() # learn neon timeline from compute @@ -1072,9 +1068,8 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i log.info("Recreate postgres to replace failed sk1 with new sk4") endpoint.stop_and_destroy().create("test_replace_safekeeper") - active_safekeepers = [2, 3, 4] env.safekeepers[3].start() - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.active_safekeepers = [2, 3, 4] endpoint.start() execute_payload(endpoint) @@ -1293,9 +1288,8 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i log.info("Use only first 3 safekeepers") env.safekeepers[3].stop() - active_safekeepers = [1, 2, 3] endpoint = env.endpoints.create("test_pull_timeline") - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.active_safekeepers = [1, 2, 3] endpoint.start() # learn neon timeline from compute @@ -1332,10 +1326,8 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i show_statuses(env.safekeepers, tenant_id, timeline_id) log.info("Restarting compute with new config to verify that it works") - active_safekeepers = [1, 3, 4] - endpoint.stop_and_destroy().create("test_pull_timeline") - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.active_safekeepers = [1, 3, 4] endpoint.start() execute_payload(endpoint) diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 7debeed1406e..ce33975a0e7b 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -2,9 +2,11 @@ import random import time from dataclasses import dataclass +from pathlib import Path from typing import List, Optional import asyncpg +import toml from fixtures.log_helper import getLogger from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper from fixtures.types import Lsn, TenantId, TimelineId @@ -251,7 +253,8 @@ def endpoint_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]): endpoint = Endpoint( env, tenant_id=env.initial_tenant, - port=env.port_distributor.get_port(), + pg_port=env.port_distributor.get_port(), + http_port=env.port_distributor.get_port(), # In these tests compute has high probability of terminating on its own # before our stop() due to lost consensus leadership. check_stop_result=False, @@ -536,15 +539,20 @@ def test_race_conditions(neon_env_builder: NeonEnvBuilder): # Check that pageserver can select safekeeper with largest commit_lsn # and switch if LSN is not updated for some time (NoWalTimeout). -async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint): - def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str: - # use ports 10, 11 and 12 to simulate unavailable safekeepers - return ",".join( - [ - f"localhost:{sk.port.pg if active else 10 + i}" - for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk)) - ] - ) +async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Path): + def adjust_safekeepers(env: NeonEnv, active_sk: List[bool]): + # Change the pg ports of the inactive safekeepers in the config file to be + # invalid, to make them unavailable to the endpoint. We use + # ports 10, 11 and 12 to simulate unavailable safekeepers. + config = toml.load(test_output_dir / "repo" / "config") + for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk)): + if active: + config["safekeepers"][i]["pg_port"] = env.safekeepers[i].port.pg + else: + config["safekeepers"][i]["pg_port"] = 10 + i + + with open(test_output_dir / "repo" / "config", "w") as f: + toml.dump(config, f) conn = await endpoint.connect_async() await conn.execute("CREATE TABLE t(key int primary key, value text)") @@ -565,7 +573,7 @@ def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str: it -= 1 continue - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_sk)) + adjust_safekeepers(env, active_sk) log.info(f"Iteration {it}: {active_sk}") endpoint.start() @@ -579,7 +587,7 @@ def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str: await conn.close() endpoint.stop() - endpoint.adjust_for_safekeepers(safekeepers_guc(env, [True] * len(env.safekeepers))) + adjust_safekeepers(env, [True] * len(env.safekeepers)) endpoint.start() conn = await endpoint.connect_async() @@ -590,11 +598,11 @@ def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str: # do inserts while restarting postgres and messing with safekeeper addresses -def test_wal_lagging(neon_env_builder: NeonEnvBuilder): +def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path): neon_env_builder.num_safekeepers = 3 env = neon_env_builder.init_start() env.neon_cli.create_branch("test_wal_lagging") endpoint = env.endpoints.create_start("test_wal_lagging") - asyncio.run(run_wal_lagging(env, endpoint)) + asyncio.run(run_wal_lagging(env, endpoint, test_output_dir)) diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 8e4e154be1fd..515d47c079ee 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -77,7 +77,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil try: trigger_wait_lsn_timeout(env, tenant_id) except Exception as e: - exception_string = str(e) + # Strip out the part before stdout, as it contains full command with the list of all safekeepers + exception_string = str(e).split("stdout", 1)[-1] assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" for safekeeper in env.safekeepers: diff --git a/test_runner/regress/test_walredo_not_left_behind_on_detach.py b/test_runner/regress/test_walredo_not_left_behind_on_detach.py index 7d944bebb3cb..4a478989356a 100644 --- a/test_runner/regress/test_walredo_not_left_behind_on_detach.py +++ b/test_runner/regress/test_walredo_not_left_behind_on_detach.py @@ -83,6 +83,9 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder): # XXX this is quite brittle as the lifecycle of the WAL redo process is an implementation detail assert_child_processes(pagserver_pid, wal_redo_present=True, defunct_present=False) + # Stop the compute before detaching, to avoid errors in the log. + endpoint.stop() + last_error = None for i in range(3): try: