From 01ee0d5607bcfc482eb76fd78dc436b4a9240bfb Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 5 May 2023 13:42:50 +0300 Subject: [PATCH 01/12] Add --http-port option to compute_ctl. If the port is hard-coded, you cannot have more than one compute_ctl running at a time. (Or not at all, if you're unlucky and the port is in use) --- compute_tools/src/bin/compute_ctl.rs | 14 +++++++++++++- compute_tools/src/http/api.rs | 8 ++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 2f515c9bf1d4..c6cfde1d1a4d 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -59,6 +59,9 @@ fn main() -> Result<()> { let matches = cli().get_matches(); + let http_port = *matches + .get_one::("http-port") + .expect("http-port is required"); let pgdata = matches .get_one::("pgdata") .expect("PGDATA path is required"); @@ -178,7 +181,8 @@ fn main() -> Result<()> { // Launch http service first, so we were able to serve control-plane // requests, while configuration is still in progress. - let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread"); + let _http_handle = + launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); if !spec_set { // No spec provided, hang waiting for it. @@ -286,6 +290,14 @@ fn cli() -> clap::Command { let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"); clap::Command::new("compute_ctl") .version(version) + .arg( + Arg::new("http-port") + .long("http-port") + .value_name("HTTP_PORT") + .default_value("3080") + .value_parser(clap::value_parser!(u16)) + .required(false), + ) .arg( Arg::new("connstr") .short('C') diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 4468f6f5e49a..afd9c2fb5479 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -220,8 +220,8 @@ fn render_json_error(e: &str, status: StatusCode) -> Response { // Main Hyper HTTP server function that runs it and blocks waiting on it forever. #[tokio::main] -async fn serve(state: Arc) { - let addr = SocketAddr::from(([0, 0, 0, 0], 3080)); +async fn serve(port: u16, state: Arc) { + let addr = SocketAddr::from(([0, 0, 0, 0], port)); let make_service = make_service_fn(move |_conn| { let state = state.clone(); @@ -256,10 +256,10 @@ async fn serve(state: Arc) { } /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`. -pub fn launch_http_server(state: &Arc) -> Result> { +pub fn launch_http_server(port: u16, state: &Arc) -> Result> { let state = Arc::clone(state); Ok(thread::Builder::new() .name("http-endpoint".into()) - .spawn(move || serve(state))?) + .spawn(move || serve(port, state))?) } From 227da024c842de7f997f263ac2f8cff93fe05e99 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 5 May 2023 16:19:50 +0300 Subject: [PATCH 02/12] Make cluster_id and name optional in the compute spec file. 'compute_ctl' merely prints them to the log. The next commit will make the local test control plane to also use 'compute_ctl', and it doesn't have the concept of a project, so it cannot fill them with any sensible values. --- compute_tools/src/compute.rs | 4 ++-- libs/compute_api/src/spec.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index a7746629a858..b3865932ebe4 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -376,7 +376,7 @@ impl ComputeNode { info!( "finished configuration of compute for project {}", - spec.cluster.cluster_id + spec.cluster.cluster_id.as_deref().unwrap_or("None") ); Ok(()) @@ -434,7 +434,7 @@ impl ComputeNode { let spec = compute_state.pspec.as_ref().expect("spec must be set"); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", - spec.spec.cluster.cluster_id, + spec.spec.cluster.cluster_id.as_deref().unwrap_or("None"), spec.spec.operation_uuid.as_deref().unwrap_or("None"), spec.tenant_id, spec.timeline_id, diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 6072980ed8bb..770eca23b708 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -49,8 +49,8 @@ pub enum ComputeMode { #[derive(Clone, Debug, Default, Deserialize)] pub struct Cluster { - pub cluster_id: String, - pub name: String, + pub cluster_id: Option, + pub name: Option, pub state: Option, pub roles: Vec, pub databases: Vec, From 0c4d16b6b0c2a556c388c07cceabb1f3210abd25 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 9 May 2023 20:24:54 +0300 Subject: [PATCH 03/12] Use compute_ctl to launch endpoints. This adds test coverage for 'compute_ctl', as it is now used by all the python tests. There are a few differences in how 'compute_ctl' is called in the tests, compared to the real web console: - In the tests, the postgresql.conf file is included as one large string in the spec file, and it is written out as it is to the data directory. I added a new field for that to the spec file. The real web console, however, sets all the necessary settings in the 'settings' field, and 'compute_ctl' creates the postgresql.conf from those settings. - In the tests, the information needed to connect to the storage, i.e. tenant_id, timeline_id, connection strings to pageserver and safekeepers, are now passed as new fields in the spec file. The real web console includes them as the GUCs in the 'settings' field. (Both of these are different from what the test control plane used to do: It used to write the GUCs directly in the postgresql.conf file). The plan is to change the control plane to use the new method, and remove the old method, but for now, support both. Some tests that were sensitive to the amount of WAL generated needed small changes, to accommodate that compute_ctl runs the background health monitor which makes a few small updates. Also some tests shut down the pageserver, and now that the background health check can run some queries while the pageserver is down, that can produce a few extra errors in the logs, which needed to be allowlisted. --- compute_tools/src/compute.rs | 46 +- compute_tools/src/config.rs | 44 +- compute_tools/src/pg_helpers.rs | 2 +- control_plane/src/bin/neon_local.rs | 70 ++- control_plane/src/broker.rs | 6 + control_plane/src/endpoint.rs | 581 +++++++++--------- control_plane/src/local_env.rs | 2 +- control_plane/src/pageserver.rs | 6 + control_plane/src/safekeeper.rs | 6 + libs/compute_api/src/responses.rs | 8 +- libs/compute_api/src/spec.rs | 42 +- test_runner/fixtures/neon_fixtures.py | 78 +-- test_runner/regress/test_compatibility.py | 6 +- test_runner/regress/test_compute_ctl.py | 253 -------- test_runner/regress/test_neon_local_cli.py | 11 +- test_runner/regress/test_tenant_detach.py | 78 ++- test_runner/regress/test_tenant_size.py | 2 +- test_runner/regress/test_tenants.py | 1 + test_runner/regress/test_wal_acceptor.py | 16 +- .../regress/test_wal_acceptor_async.py | 36 +- .../test_walredo_not_left_behind_on_detach.py | 3 + 21 files changed, 635 insertions(+), 662 deletions(-) delete mode 100644 test_runner/regress/test_compute_ctl.py diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index b3865932ebe4..e5d189268187 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -106,26 +106,38 @@ pub struct ParsedSpec { impl TryFrom for ParsedSpec { type Error = String; fn try_from(spec: ComputeSpec) -> Result { + // Extract the options from the spec file that are needed to connect to + // the storage system. + // + // For backwards-compatibility, the top-level fields in the spec file + // may be empty. In that case, we need to dig them from the GUCs in the + // cluster.settings field. let pageserver_connstr = spec - .cluster - .settings - .find("neon.pageserver_connstring") + .pageserver_connstring + .clone() + .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring")) .ok_or("pageserver connstr should be provided")?; let storage_auth_token = spec.storage_auth_token.clone(); - let tenant_id: TenantId = spec - .cluster - .settings - .find("neon.tenant_id") - .ok_or("tenant id should be provided") - .map(|s| TenantId::from_str(&s))? - .or(Err("invalid tenant id"))?; - let timeline_id: TimelineId = spec - .cluster - .settings - .find("neon.timeline_id") - .ok_or("timeline id should be provided") - .map(|s| TimelineId::from_str(&s))? - .or(Err("invalid timeline id"))?; + let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id { + tenant_id + } else { + spec.cluster + .settings + .find("neon.tenant_id") + .ok_or("tenant id should be provided") + .map(|s| TenantId::from_str(&s))? + .or(Err("invalid tenant id"))? + }; + let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id { + timeline_id + } else { + spec.cluster + .settings + .find("neon.timeline_id") + .ok_or("timeline id should be provided") + .map(|s| TimelineId::from_str(&s))? + .or(Err("invalid timeline id"))? + }; Ok(ParsedSpec { spec, diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 1168f3876af3..99346433d062 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -5,6 +5,7 @@ use std::path::Path; use anyhow::Result; +use crate::pg_helpers::escape_conf_value; use crate::pg_helpers::PgOptionsSerialize; use compute_api::spec::{ComputeMode, ComputeSpec}; @@ -36,10 +37,44 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { // File::create() destroys the file content if it exists. let mut file = File::create(path)?; - writeln!(file, "# Managed by compute_ctl: begin")?; + // Write the postgresql.conf content from the spec file as is. + if let Some(conf) = &spec.cluster.postgresql_conf { + writeln!(file, "{}", conf)?; + } write!(file, "{}", &spec.cluster.settings.as_pg_settings())?; + // Add options for connecting to storage + writeln!(file, "# Neon storage settings")?; + if let Some(s) = &spec.pageserver_connstring { + writeln!( + file, + "neon.pageserver_connstring='{}'", + escape_conf_value(s) + )?; + } + if !spec.safekeeper_connstrings.is_empty() { + writeln!( + file, + "neon.safekeepers='{}'", + escape_conf_value(&spec.safekeeper_connstrings.join(",")) + )?; + } + if let Some(s) = &spec.tenant_id { + writeln!( + file, + "neon.tenant_id='{}'", + escape_conf_value(&s.to_string()) + )?; + } + if let Some(s) = &spec.timeline_id { + writeln!( + file, + "neon.timeline_id='{}'", + escape_conf_value(&s.to_string()) + )?; + } + match spec.mode { ComputeMode::Primary => {} ComputeMode::Static(lsn) => { @@ -53,7 +88,12 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { } } - writeln!(file, "# Managed by compute_ctl: end")?; + // If there are any extra options in the 'settings' field, append those + if spec.cluster.settings.is_some() { + writeln!(file, "# Managed by compute_ctl: begin")?; + write!(file, "{}", spec.cluster.settings.as_pg_settings())?; + writeln!(file, "# Managed by compute_ctl: end")?; + } Ok(()) } diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index ed00485d5ac3..d5c845e9eaae 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -23,7 +23,7 @@ fn escape_literal(s: &str) -> String { /// Escape a string so that it can be used in postgresql.conf. /// Same as escape_literal, currently. -fn escape_conf_value(s: &str) -> String { +pub fn escape_conf_value(s: &str) -> String { s.replace('\'', "''").replace('\\', "\\\\") } diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 39551642c0b6..52af936d7b7d 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -476,10 +476,11 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - println!("Creating endpoint for imported timeline ..."); cplane.new_endpoint( - tenant_id, name, + tenant_id, timeline_id, None, + None, pg_version, ComputeMode::Primary, )?; @@ -591,7 +592,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( table.add_row([ endpoint_id.as_str(), - &endpoint.address.to_string(), + &endpoint.pg_address.to_string(), &endpoint.timeline_id.to_string(), branch_name, lsn_str.as_str(), @@ -620,8 +621,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .get_branch_timeline_id(branch_name, tenant_id) .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?; - let port: Option = sub_args.get_one::("port").copied(); - + let pg_port: Option = sub_args.get_one::("pg-port").copied(); + let http_port: Option = sub_args.get_one::("http-port").copied(); let pg_version = sub_args .get_one::("pg-version") .copied() @@ -639,14 +640,38 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), }; - cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, port, pg_version, mode)?; + cplane.new_endpoint( + &endpoint_id, + tenant_id, + timeline_id, + pg_port, + http_port, + pg_version, + mode, + )?; } "start" => { - let port: Option = sub_args.get_one::("port").copied(); + let pg_port: Option = sub_args.get_one::("pg-port").copied(); + let http_port: Option = sub_args.get_one::("http-port").copied(); let endpoint_id = sub_args .get_one::("endpoint_id") .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?; + // If --safekeepers argument is given, use only the listed safekeeper nodes. + let safekeepers = + if let Some(safekeepers_str) = sub_args.get_one::("safekeepers") { + let mut safekeepers: Vec = Vec::new(); + for sk_id in safekeepers_str.split(',').map(str::trim) { + let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| { + anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list") + })?); + safekeepers.push(sk_id); + } + safekeepers + } else { + env.safekeepers.iter().map(|sk| sk.id).collect() + }; + let endpoint = cplane.endpoints.get(endpoint_id.as_str()); let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) { @@ -673,7 +698,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( _ => {} } println!("Starting existing endpoint {endpoint_id}..."); - endpoint.start(&auth_token)?; + endpoint.start(&auth_token, safekeepers)?; } else { let branch_name = sub_args .get_one::("branch-name") @@ -709,14 +734,15 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ..."); let ep = cplane.new_endpoint( - tenant_id, endpoint_id, + tenant_id, timeline_id, - port, + pg_port, + http_port, pg_version, mode, )?; - ep.start(&auth_token)?; + ep.start(&auth_token, safekeepers)?; } } "stop" => { @@ -944,11 +970,22 @@ fn cli() -> Command { .value_parser(value_parser!(u32)) .default_value(DEFAULT_PG_VERSION); - let port_arg = Arg::new("port") - .long("port") + let pg_port_arg = Arg::new("pg-port") + .long("pg-port") .required(false) .value_parser(value_parser!(u16)) - .value_name("port"); + .value_name("pg-port"); + + let http_port_arg = Arg::new("http-port") + .long("http-port") + .required(false) + .value_parser(value_parser!(u16)) + .value_name("http-port"); + + let safekeepers_arg = Arg::new("safekeepers") + .long("safekeepers") + .required(false) + .value_name("safekeepers"); let stop_mode_arg = Arg::new("stop-mode") .short('m') @@ -1093,7 +1130,8 @@ fn cli() -> Command { .arg(branch_name_arg.clone()) .arg(tenant_id_arg.clone()) .arg(lsn_arg.clone()) - .arg(port_arg.clone()) + .arg(pg_port_arg.clone()) + .arg(http_port_arg.clone()) .arg( Arg::new("config-only") .help("Don't do basebackup, create endpoint directory with only config files") @@ -1109,9 +1147,11 @@ fn cli() -> Command { .arg(branch_name_arg) .arg(timeline_id_arg) .arg(lsn_arg) - .arg(port_arg) + .arg(pg_port_arg) + .arg(http_port_arg) .arg(pg_version_arg) .arg(hot_standby_arg) + .arg(safekeepers_arg) ) .subcommand( Command::new("stop") diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index 6c0604a0765e..ad19dfa2049a 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -1,3 +1,9 @@ +//! Code to manage the storage broker +//! +//! In the local test environment, the data for each safekeeper is stored in +//! +//! .neon/safekeepers/ +//! use anyhow::Context; use std::path::PathBuf; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index cc5a7a416890..a4ee2f3a711e 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -1,40 +1,71 @@ +//! Code to manage compute endpoints +//! +//! In the local test environment, the data for each endpoint is stored in +//! +//! .neon/endpoints/ +//! +//! Some basic information about the endpoint, like the tenant and timeline IDs, +//! are stored in the `endpoint.json` file. The `endpoint.json` file is created +//! when the endpoint is created, and doesn't change afterwards. +//! +//! The endpoint is managed by the `compute_ctl` binary. When an endpoint is +//! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads +//! the basebackup from the pageserver to initialize the the data directory, and +//! finally launches the PostgreSQL process. It watches the PostgreSQL process +//! until it exits. +//! +//! When an endpoint is created, a `postgresql.conf` file is also created in +//! the endpoint's directory. The file can be modified before starting PostgreSQL. +//! However, the `postgresql.conf` file in the endpoint directory is not used directly +//! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another +//! copy of it in the data directory. +//! +//! Directory contents: +//! +//! ```ignore +//! .neon/endpoints/main/ +//! compute.log - log output of `compute_ctl` and `postgres` +//! endpoint.json - serialized `EndpointConf` struct +//! postgresql.conf - postgresql settings +//! spec.json - passed to `compute_ctl` +//! pgdata/ +//! postgresql.conf - copy of postgresql.conf created by `compute_ctl` +//! zenith.signal +//! +//! ``` +//! use std::collections::BTreeMap; -use std::fs::{self, File}; -use std::io::Write; use std::net::SocketAddr; use std::net::TcpStream; -use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; -use std::process::{Command, Stdio}; -use std::str::FromStr; +use std::process::Command; use std::sync::Arc; use std::time::Duration; -use anyhow::{Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, -}; +use utils::id::{NodeId, TenantId, TimelineId}; use crate::local_env::LocalEnv; use crate::pageserver::PageServerNode; use crate::postgresql_conf::PostgresConf; -use compute_api::spec::ComputeMode; +use compute_api::responses::{ComputeState, ComputeStatus}; +use compute_api::spec::{Cluster, ComputeMode, ComputeSpec}; // contents of a endpoint.json file #[serde_as] #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] pub struct EndpointConf { - name: String, + endpoint_id: String, #[serde_as(as = "DisplayFromStr")] tenant_id: TenantId, #[serde_as(as = "DisplayFromStr")] timeline_id: TimelineId, mode: ComputeMode, - port: u16, + pg_port: u16, + http_port: u16, pg_version: u32, } @@ -57,11 +88,11 @@ impl ComputeControlPlane { let pageserver = Arc::new(PageServerNode::from_env(&env)); let mut endpoints = BTreeMap::default(); - for endpoint_dir in fs::read_dir(env.endpoints_path()) + for endpoint_dir in std::fs::read_dir(env.endpoints_path()) .with_context(|| format!("failed to list {}", env.endpoints_path().display()))? { let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?; - endpoints.insert(ep.name.clone(), Arc::new(ep)); + endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep)); } Ok(ComputeControlPlane { @@ -76,25 +107,28 @@ impl ComputeControlPlane { 1 + self .endpoints .values() - .map(|ep| ep.address.port()) + .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port())) .max() .unwrap_or(self.base_port) } + #[allow(clippy::too_many_arguments)] pub fn new_endpoint( &mut self, + endpoint_id: &str, tenant_id: TenantId, - name: &str, timeline_id: TimelineId, - port: Option, + pg_port: Option, + http_port: Option, pg_version: u32, mode: ComputeMode, ) -> Result> { - let port = port.unwrap_or_else(|| self.get_port()); - + let pg_port = pg_port.unwrap_or_else(|| self.get_port()); + let http_port = http_port.unwrap_or_else(|| self.get_port() + 1); let ep = Arc::new(Endpoint { - name: name.to_owned(), - address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), + endpoint_id: endpoint_id.to_owned(), + pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port), + http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port), env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), timeline_id, @@ -102,21 +136,27 @@ impl ComputeControlPlane { tenant_id, pg_version, }); - ep.create_pgdata()?; + + ep.create_endpoint_dir()?; std::fs::write( ep.endpoint_path().join("endpoint.json"), serde_json::to_string_pretty(&EndpointConf { - name: name.to_string(), + endpoint_id: endpoint_id.to_string(), tenant_id, timeline_id, mode, - port, + http_port, + pg_port, pg_version, })?, )?; - ep.setup_pg_conf()?; + std::fs::write( + ep.endpoint_path().join("postgresql.conf"), + ep.setup_pg_conf()?.to_string(), + )?; - self.endpoints.insert(ep.name.clone(), Arc::clone(&ep)); + self.endpoints + .insert(ep.endpoint_id.clone(), Arc::clone(&ep)); Ok(ep) } @@ -127,13 +167,15 @@ impl ComputeControlPlane { #[derive(Debug)] pub struct Endpoint { /// used as the directory name - name: String, + endpoint_id: String, pub tenant_id: TenantId, pub timeline_id: TimelineId, pub mode: ComputeMode, - // port and address of the Postgres server - pub address: SocketAddr, + // port and address of the Postgres server and `compute_ctl`'s HTTP API + pub pg_address: SocketAddr, + pub http_address: SocketAddr, + // postgres major version in the format: 14, 15, etc. pg_version: u32, @@ -158,16 +200,16 @@ impl Endpoint { // parse data directory name let fname = entry.file_name(); - let name = fname.to_str().unwrap().to_string(); + let endpoint_id = fname.to_str().unwrap().to_string(); // Read the endpoint.json file let conf: EndpointConf = serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?; - // ok now Ok(Endpoint { - address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port), - name, + pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port), + http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port), + endpoint_id, env: env.clone(), pageserver: Arc::clone(pageserver), timeline_id: conf.timeline_id, @@ -177,109 +219,27 @@ impl Endpoint { }) } - fn sync_safekeepers(&self, auth_token: &Option, pg_version: u32) -> Result { - let pg_path = self.env.pg_bin_dir(pg_version)?.join("postgres"); - let mut cmd = Command::new(pg_path); - - cmd.arg("--sync-safekeepers") - .env_clear() - .env( - "LD_LIBRARY_PATH", - self.env.pg_lib_dir(pg_version)?.to_str().unwrap(), - ) - .env( - "DYLD_LIBRARY_PATH", - self.env.pg_lib_dir(pg_version)?.to_str().unwrap(), - ) - .env("PGDATA", self.pgdata().to_str().unwrap()) - .stdout(Stdio::piped()) - // Comment this to avoid capturing stderr (useful if command hangs) - .stderr(Stdio::piped()); - - if let Some(token) = auth_token { - cmd.env("NEON_AUTH_TOKEN", token); - } - - let sync_handle = cmd - .spawn() - .expect("postgres --sync-safekeepers failed to start"); - - let sync_output = sync_handle - .wait_with_output() - .expect("postgres --sync-safekeepers failed"); - if !sync_output.status.success() { - anyhow::bail!( - "sync-safekeepers failed: '{}'", - String::from_utf8_lossy(&sync_output.stderr) - ); - } - - let lsn = Lsn::from_str(std::str::from_utf8(&sync_output.stdout)?.trim())?; - println!("Safekeepers synced on {}", lsn); - Ok(lsn) - } - - /// Get basebackup from the pageserver as a tar archive and extract it - /// to the `self.pgdata()` directory. - fn do_basebackup(&self, lsn: Option) -> Result<()> { - println!( - "Extracting base backup to create postgres instance: path={} port={}", - self.pgdata().display(), - self.address.port() - ); - - let sql = if let Some(lsn) = lsn { - format!("basebackup {} {} {}", self.tenant_id, self.timeline_id, lsn) - } else { - format!("basebackup {} {}", self.tenant_id, self.timeline_id) - }; - - let mut client = self - .pageserver - .page_server_psql_client() - .context("connecting to page server failed")?; - - let copyreader = client - .copy_out(sql.as_str()) - .context("page server 'basebackup' command failed")?; - - // Read the archive directly from the `CopyOutReader` - // - // Set `ignore_zeros` so that unpack() reads all the Copy data and - // doesn't stop at the end-of-archive marker. Otherwise, if the server - // sends an Error after finishing the tarball, we will not notice it. - let mut ar = tar::Archive::new(copyreader); - ar.set_ignore_zeros(true); - ar.unpack(&self.pgdata()) - .context("extracting base backup failed")?; - - Ok(()) - } - - fn create_pgdata(&self) -> Result<()> { - fs::create_dir_all(self.pgdata()).with_context(|| { + fn create_endpoint_dir(&self) -> Result<()> { + std::fs::create_dir_all(self.endpoint_path()).with_context(|| { format!( - "could not create data directory {}", - self.pgdata().display() + "could not create endpoint directory {}", + self.endpoint_path().display() ) - })?; - fs::set_permissions(self.pgdata().as_path(), fs::Permissions::from_mode(0o700)) - .with_context(|| { - format!( - "could not set permissions in data directory {}", - self.pgdata().display() - ) - }) + }) } - // Write postgresql.conf with default configuration - // and PG_VERSION file to the data directory of a new endpoint. - fn setup_pg_conf(&self) -> Result<()> { + // Generate postgresql.conf with default configuration + fn setup_pg_conf(&self) -> Result { let mut conf = PostgresConf::new(); conf.append("max_wal_senders", "10"); conf.append("wal_log_hints", "off"); conf.append("max_replication_slots", "10"); conf.append("hot_standby", "on"); + // prefetching of blocks referenced in WAL doesn't make sense for us + // Neon hot standby ignores pages that are not in the shared_buffers + if self.pg_version >= 15 { + conf.append("recovery_prefetch", "off"); + } conf.append("shared_buffers", "1MB"); conf.append("fsync", "off"); conf.append("max_connections", "100"); @@ -287,149 +247,53 @@ impl Endpoint { // wal_sender_timeout is the maximum time to wait for WAL replication. // It also defines how often the walreciever will send a feedback message to the wal sender. conf.append("wal_sender_timeout", "5s"); - conf.append("listen_addresses", &self.address.ip().to_string()); - conf.append("port", &self.address.port().to_string()); + conf.append("listen_addresses", &self.pg_address.ip().to_string()); + conf.append("port", &self.pg_address.port().to_string()); conf.append("wal_keep_size", "0"); // walproposer panics when basebackup is invalid, it is pointless to restart in this case. conf.append("restart_after_crash", "off"); - // Configure the Neon Postgres extension to fetch pages from pageserver - let pageserver_connstr = { - let config = &self.pageserver.pg_connection_config; - let (host, port) = (config.host(), config.port()); - - // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere. - format!("postgresql://no_user@{host}:{port}") - }; + // Load the 'neon' extension conf.append("shared_preload_libraries", "neon"); conf.append_line(""); - conf.append("neon.pageserver_connstring", &pageserver_connstr); - conf.append("neon.tenant_id", &self.tenant_id.to_string()); - conf.append("neon.timeline_id", &self.timeline_id.to_string()); - conf.append_line(""); - // Replication-related configurations, such as WAL sending - match &self.mode { - ComputeMode::Primary => { - // Configure backpressure - // - Replication write lag depends on how fast the walreceiver can process incoming WAL. - // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, - // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. - // Actually latency should be much smaller (better if < 1sec). But we assume that recently - // updates pages are not requested from pageserver. - // - Replication flush lag depends on speed of persisting data by checkpointer (creation of - // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to - // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long - // recovery time (in case of pageserver crash) and disk space overflow at safekeepers. - // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. - // To be able to restore database in case of pageserver node crash, safekeeper should not - // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers - // (if they are not able to upload WAL to S3). - conf.append("max_replication_write_lag", "15MB"); - conf.append("max_replication_flush_lag", "10GB"); - - if !self.env.safekeepers.is_empty() { - // Configure Postgres to connect to the safekeepers - conf.append("synchronous_standby_names", "walproposer"); - - let safekeepers = self - .env - .safekeepers - .iter() - .map(|sk| format!("localhost:{}", sk.pg_port)) - .collect::>() - .join(","); - conf.append("neon.safekeepers", &safekeepers); - } else { - // We only use setup without safekeepers for tests, - // and don't care about data durability on pageserver, - // so set more relaxed synchronous_commit. - conf.append("synchronous_commit", "remote_write"); - - // Configure the node to stream WAL directly to the pageserver - // This isn't really a supported configuration, but can be useful for - // testing. - conf.append("synchronous_standby_names", "pageserver"); - } - } - ComputeMode::Static(lsn) => { - conf.append("recovery_target_lsn", &lsn.to_string()); - } - ComputeMode::Replica => { - assert!(!self.env.safekeepers.is_empty()); - - // TODO: use future host field from safekeeper spec - // Pass the list of safekeepers to the replica so that it can connect to any of them, - // whichever is availiable. - let sk_ports = self - .env - .safekeepers - .iter() - .map(|x| x.pg_port.to_string()) - .collect::>() - .join(","); - let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(","); - - let connstr = format!( - "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true", - sk_hosts, - sk_ports, - &self.timeline_id.to_string(), - &self.tenant_id.to_string(), - ); - - let slot_name = format!("repl_{}_", self.timeline_id); - conf.append("primary_conninfo", connstr.as_str()); - conf.append("primary_slot_name", slot_name.as_str()); - conf.append("hot_standby", "on"); - // prefetching of blocks referenced in WAL doesn't make sense for us - // Neon hot standby ignores pages that are not in the shared_buffers - if self.pg_version >= 15 { - conf.append("recovery_prefetch", "off"); - } - } + // Configure backpressure + // - Replication write lag depends on how fast the walreceiver can process incoming WAL. + // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, + // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. + // Actually latency should be much smaller (better if < 1sec). But we assume that recently + // updates pages are not requested from pageserver. + // - Replication flush lag depends on speed of persisting data by checkpointer (creation of + // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to + // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long + // recovery time (in case of pageserver crash) and disk space overflow at safekeepers. + // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. + // To be able to restore database in case of pageserver node crash, safekeeper should not + // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers + // (if they are not able to upload WAL to S3). + conf.append("max_replication_write_lag", "15MB"); + conf.append("max_replication_flush_lag", "10GB"); + + if !self.env.safekeepers.is_empty() { + // Configure Postgres to connect to the safekeepers + conf.append("synchronous_standby_names", "walproposer"); + } else { + // We only use setup without safekeepers for tests, + // and don't care about data durability on pageserver, + // so set more relaxed synchronous_commit. + conf.append("synchronous_commit", "remote_write"); + + // Configure the node to stream WAL directly to the pageserver + // This isn't really a supported configuration, but can be useful for + // testing. + conf.append("synchronous_standby_names", "pageserver"); } - let mut file = File::create(self.pgdata().join("postgresql.conf"))?; - file.write_all(conf.to_string().as_bytes())?; - - let mut file = File::create(self.pgdata().join("PG_VERSION"))?; - file.write_all(self.pg_version.to_string().as_bytes())?; - - Ok(()) - } - - fn load_basebackup(&self, auth_token: &Option) -> Result<()> { - let backup_lsn = match &self.mode { - ComputeMode::Primary => { - if !self.env.safekeepers.is_empty() { - // LSN 0 means that it is bootstrap and we need to download just - // latest data from the pageserver. That is a bit clumsy but whole bootstrap - // procedure evolves quite actively right now, so let's think about it again - // when things would be more stable (TODO). - let lsn = self.sync_safekeepers(auth_token, self.pg_version)?; - if lsn == Lsn(0) { - None - } else { - Some(lsn) - } - } else { - None - } - } - ComputeMode::Static(lsn) => Some(*lsn), - ComputeMode::Replica => { - None // Take the latest snapshot available to start with - } - }; - - self.do_basebackup(backup_lsn)?; - - Ok(()) + Ok(conf) } pub fn endpoint_path(&self) -> PathBuf { - self.env.endpoints_path().join(&self.name) + self.env.endpoints_path().join(&self.endpoint_id) } pub fn pgdata(&self) -> PathBuf { @@ -439,7 +303,7 @@ impl Endpoint { pub fn status(&self) -> &str { let timeout = Duration::from_millis(300); let has_pidfile = self.pgdata().join("postmaster.pid").exists(); - let can_connect = TcpStream::connect_timeout(&self.address, timeout).is_ok(); + let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok(); match (has_pidfile, can_connect) { (true, true) => "running", @@ -457,8 +321,6 @@ impl Endpoint { &[ "-D", self.pgdata().to_str().unwrap(), - "-l", - self.pgdata().join("pg.log").to_str().unwrap(), "-w", //wait till pg_ctl actually does what was asked ], args, @@ -494,36 +356,181 @@ impl Endpoint { Ok(()) } - pub fn start(&self, auth_token: &Option) -> Result<()> { + pub fn start(&self, auth_token: &Option, safekeepers: Vec) -> Result<()> { if self.status() == "running" { anyhow::bail!("The endpoint is already running"); } - // 1. We always start Postgres from scratch, so - // if old dir exists, preserve 'postgresql.conf' and drop the directory - let postgresql_conf_path = self.pgdata().join("postgresql.conf"); - let postgresql_conf = fs::read(&postgresql_conf_path).with_context(|| { - format!( - "failed to read config file in {}", - postgresql_conf_path.to_str().unwrap() - ) - })?; - fs::remove_dir_all(self.pgdata())?; - self.create_pgdata()?; + // Slurp the endpoints//postgresql.conf file into + // memory. We will include it in the spec file that we pass to + // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf + // in the data directory. + let postgresql_conf_path = self.endpoint_path().join("postgresql.conf"); + let postgresql_conf = match std::fs::read(&postgresql_conf_path) { + Ok(content) => String::from_utf8(content)?, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => "".to_string(), + Err(e) => { + return Err(anyhow::Error::new(e).context(format!( + "failed to read config file in {}", + postgresql_conf_path.to_str().unwrap() + ))) + } + }; + + // We always start the compute node from scratch, so if the Postgres + // data dir exists from a previous launch, remove it first. + if self.pgdata().exists() { + std::fs::remove_dir_all(self.pgdata())?; + } - // 2. Bring back config files - fs::write(&postgresql_conf_path, postgresql_conf)?; + let pageserver_connstring = { + let config = &self.pageserver.pg_connection_config; + let (host, port) = (config.host(), config.port()); - // 3. Load basebackup - self.load_basebackup(auth_token)?; + // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere. + format!("postgresql://no_user@{host}:{port}") + }; + let mut safekeeper_connstrings = Vec::new(); + for sk_id in safekeepers { + let sk = self + .env + .safekeepers + .iter() + .find(|node| node.id == sk_id) + .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?; + safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.pg_port)); + } - if self.mode != ComputeMode::Primary { - File::create(self.pgdata().join("standby.signal"))?; + // Create spec file + let spec = ComputeSpec { + format_version: 1.0, + operation_uuid: None, + cluster: Cluster { + cluster_id: None, // project ID: not used + name: None, // project name: not used + state: None, + roles: vec![], + databases: vec![], + settings: None, + postgresql_conf: Some(postgresql_conf), + }, + delta_operations: None, + tenant_id: Some(self.tenant_id), + timeline_id: Some(self.timeline_id), + mode: self.mode, + pageserver_connstring: Some(pageserver_connstring), + safekeeper_connstrings, + storage_auth_token: auth_token.clone(), + }; + let spec_path = self.endpoint_path().join("spec.json"); + std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; + + // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it. + let logfile = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(self.endpoint_path().join("compute.log"))?; + + // Launch compute_ctl + println!("Starting postgres node at '{}'", self.connstr()); + let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl")); + cmd.args(["--http-port", &self.http_address.port().to_string()]) + .args(["--pgdata", self.pgdata().to_str().unwrap()]) + .args(["--connstr", &self.connstr()]) + .args([ + "--spec-path", + self.endpoint_path().join("spec.json").to_str().unwrap(), + ]) + .args([ + "--pgbin", + self.env + .pg_bin_dir(self.pg_version)? + .join("postgres") + .to_str() + .unwrap(), + ]) + .stdin(std::process::Stdio::null()) + .stderr(logfile.try_clone()?) + .stdout(logfile); + let _child = cmd.spawn()?; + + // Wait for it to start + let mut attempt = 0; + const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100); + const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s + loop { + attempt += 1; + match self.get_status() { + Ok(state) => { + match state.status { + ComputeStatus::Init => { + if attempt == MAX_ATTEMPTS { + bail!("compute startup timed out; still in Init state"); + } + // keep retrying + } + ComputeStatus::Running => { + // All good! + break; + } + ComputeStatus::Failed => { + bail!( + "compute startup failed: {}", + state + .error + .as_deref() + .unwrap_or("") + ); + } + ComputeStatus::Empty + | ComputeStatus::ConfigurationPending + | ComputeStatus::Configuration => { + bail!("unexpected compute status: {:?}", state.status) + } + } + } + Err(e) => { + if attempt == MAX_ATTEMPTS { + return Err(e).context( + "timed out waiting to connect to compute_ctl HTTP; last error: {e}", + ); + } + } + } + std::thread::sleep(ATTEMPT_INTERVAL); } - // 4. Finally start postgres - println!("Starting postgres at '{}'", self.connstr()); - self.pg_ctl(&["start"], auth_token) + Ok(()) + } + + // Call the /status HTTP API + pub fn get_status(&self) -> Result { + let client = reqwest::blocking::Client::new(); + + let response = client + .request( + reqwest::Method::GET, + format!( + "http://{}:{}/status", + self.http_address.ip(), + self.http_address.port() + ), + ) + .send()?; + + // Interpret the response + let status = response.status(); + if !(status.is_client_error() || status.is_server_error()) { + Ok(response.json()?) + } else { + // reqwest does not export its error construction utility functions, so let's craft the message ourselves + let url = response.url().to_owned(); + let msg = match response.text() { + Ok(err_body) => format!("Error: {}", err_body), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }; + Err(anyhow::anyhow!(msg)) + } } pub fn stop(&self, destroy: bool) -> Result<()> { @@ -540,7 +547,7 @@ impl Endpoint { "Destroying postgres data directory '{}'", self.pgdata().to_str().unwrap() ); - fs::remove_dir_all(self.endpoint_path())?; + std::fs::remove_dir_all(self.endpoint_path())?; } else { self.pg_ctl(&["stop"], &None)?; } @@ -549,10 +556,10 @@ impl Endpoint { pub fn connstr(&self) -> String { format!( - "host={} port={} user={} dbname={}", - self.address.ip(), - self.address.port(), + "postgresql://{}@{}:{}/{}", "cloud_admin", + self.pg_address.ip(), + self.pg_address.port(), "postgres" ) } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 9286944412dd..df70cb313928 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -37,7 +37,7 @@ pub const DEFAULT_PG_VERSION: u32 = 15; #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] pub struct LocalEnv { // Base directory for all the nodes (the pageserver, safekeepers and - // compute nodes). + // compute endpoints). // // This is not stored in the config file. Rather, this is the path where the // config file itself is. It is read from the NEON_REPO_DIR env variable or diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 400df60f0e50..2ff09021e536 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -1,3 +1,9 @@ +//! Code to manage pageservers +//! +//! In the local test environment, the pageserver stores its data directly in +//! +//! .neon/ +//! use std::borrow::Cow; use std::collections::HashMap; use std::fs::File; diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index d358f7334325..9e053ff1f19f 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -1,3 +1,9 @@ +//! Code to manage safekeepers +//! +//! In the local test environment, the data for each safekeeper is stored in +//! +//! .neon/safekeepers/ +//! use std::io::Write; use std::path::PathBuf; use std::process::Child; diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index d181c018b1be..ce73dda08ad8 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -5,13 +5,13 @@ use serde::{Deserialize, Serialize, Serializer}; use crate::spec::ComputeSpec; -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Deserialize)] pub struct GenericAPIError { pub error: String, } /// Response of the /status API -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Deserialize)] #[serde(rename_all = "snake_case")] pub struct ComputeStatusResponse { pub start_time: DateTime, @@ -23,7 +23,7 @@ pub struct ComputeStatusResponse { pub error: Option, } -#[derive(Serialize)] +#[derive(Deserialize, Serialize)] #[serde(rename_all = "snake_case")] pub struct ComputeState { pub status: ComputeStatus, @@ -33,7 +33,7 @@ pub struct ComputeState { pub error: Option, } -#[derive(Serialize, Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ComputeStatus { // Spec wasn't provided at start, waiting for it to be diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 770eca23b708..4014774a7ed0 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -5,6 +5,7 @@ //! and connect it to the storage nodes. use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; +use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; /// String type alias representing Postgres identifier and @@ -14,7 +15,7 @@ pub type PgIdent = String; /// Cluster spec or configuration represented as an optional number of /// delta operations + final cluster state description. #[serde_as] -#[derive(Clone, Debug, Default, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct ComputeSpec { pub format_version: f32, @@ -26,9 +27,32 @@ pub struct ComputeSpec { pub cluster: Cluster, pub delta_operations: Option>, + // Information needed to connect to the storage layer. + // + // `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed. + // + // Depending on `mode`, this can be a primary read-write node, a read-only + // replica, or a read-only node pinned at an older LSN. + // `safekeeper_connstrings` must be set for a primary. + // + // For backwards compatibility, the control plane may leave out all of + // these, and instead set the "neon.tenant_id", "neon.timeline_id", + // etc. GUCs in cluster.settings. TODO: Once the control plane has been + // updated to fill these fields, we can make these non optional. + #[serde_as(as = "Option")] + pub tenant_id: Option, + #[serde_as(as = "Option")] + pub timeline_id: Option, + #[serde_as(as = "Option")] + pub pageserver_connstring: Option, + #[serde(default)] + pub safekeeper_connstrings: Vec, + #[serde(default)] pub mode: ComputeMode, + /// If set, 'storage_auth_token' is used as the password to authenticate to + /// the pageserver and safekeepers. pub storage_auth_token: Option, } @@ -47,13 +71,19 @@ pub enum ComputeMode { Replica, } -#[derive(Clone, Debug, Default, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Cluster { pub cluster_id: Option, pub name: Option, pub state: Option, pub roles: Vec, pub databases: Vec, + + /// Desired contents of 'postgresql.conf' file. (The 'compute_ctl' + /// tool may add additional settings to the final file.) + pub postgresql_conf: Option, + + /// Additional settings that will be appended to the 'postgresql.conf' file. pub settings: GenericOptions, } @@ -63,7 +93,7 @@ pub struct Cluster { /// - DROP ROLE /// - ALTER ROLE name RENAME TO new_name /// - ALTER DATABASE name RENAME TO new_name -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct DeltaOp { pub action: String, pub name: PgIdent, @@ -72,7 +102,7 @@ pub struct DeltaOp { /// Rust representation of Postgres role info with only those fields /// that matter for us. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct Role { pub name: PgIdent, pub encrypted_password: Option, @@ -81,7 +111,7 @@ pub struct Role { /// Rust representation of Postgres database info with only those fields /// that matter for us. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct Database { pub name: PgIdent, pub owner: PgIdent, @@ -91,7 +121,7 @@ pub struct Database { /// Common type representing both SQL statement params with or without value, /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config /// options like `wal_level = logical`. -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct GenericOption { pub name: String, pub value: Option, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e23ed1287846..87bf78c2249e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1446,11 +1446,12 @@ def safekeeper_stop( def endpoint_create( self, branch_name: str, + pg_port: int, + http_port: int, endpoint_id: Optional[str] = None, tenant_id: Optional[TenantId] = None, hot_standby: bool = False, lsn: Optional[Lsn] = None, - port: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", @@ -1464,8 +1465,10 @@ def endpoint_create( ] if lsn is not None: args.extend(["--lsn", str(lsn)]) - if port is not None: - args.extend(["--port", str(port)]) + if pg_port is not None: + args.extend(["--pg-port", str(pg_port)]) + if http_port is not None: + args.extend(["--http-port", str(http_port)]) if endpoint_id is not None: args.append(endpoint_id) if hot_standby: @@ -1478,9 +1481,11 @@ def endpoint_create( def endpoint_start( self, endpoint_id: str, + pg_port: int, + http_port: int, + safekeepers: Optional[List[int]] = None, tenant_id: Optional[TenantId] = None, lsn: Optional[Lsn] = None, - port: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", @@ -1492,8 +1497,10 @@ def endpoint_start( ] if lsn is not None: args.append(f"--lsn={lsn}") - if port is not None: - args.append(f"--port={port}") + args.extend(["--pg-port", str(pg_port)]) + args.extend(["--http-port", str(http_port)]) + if safekeepers is not None: + args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) if endpoint_id is not None: args.append(endpoint_id) @@ -2280,17 +2287,24 @@ class Endpoint(PgProtocol): """An object representing a Postgres compute endpoint managed by the control plane.""" def __init__( - self, env: NeonEnv, tenant_id: TenantId, port: int, check_stop_result: bool = True + self, + env: NeonEnv, + tenant_id: TenantId, + pg_port: int, + http_port: int, + check_stop_result: bool = True, ): - super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres") + super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres") self.env = env self.running = False self.branch_name: Optional[str] = None # dubious self.endpoint_id: Optional[str] = None # dubious, see asserts below self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA self.tenant_id = tenant_id - self.port = port + self.pg_port = pg_port + self.http_port = http_port self.check_stop_result = check_stop_result + self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers)) # path to conf is /endpoints//pgdata/postgresql.conf def create( @@ -2320,7 +2334,8 @@ def create( tenant_id=self.tenant_id, lsn=lsn, hot_standby=hot_standby, - port=self.port, + pg_port=self.pg_port, + http_port=self.http_port, ) path = Path("endpoints") / self.endpoint_id / "pgdata" self.pgdata_dir = os.path.join(self.env.repo_dir, path) @@ -2345,7 +2360,13 @@ def start(self) -> "Endpoint": log.info(f"Starting postgres endpoint {self.endpoint_id}") - self.env.neon_cli.endpoint_start(self.endpoint_id, tenant_id=self.tenant_id, port=self.port) + self.env.neon_cli.endpoint_start( + self.endpoint_id, + pg_port=self.pg_port, + http_port=self.http_port, + tenant_id=self.tenant_id, + safekeepers=self.active_safekeepers, + ) self.running = True return self @@ -2369,32 +2390,8 @@ def pg_twophase_dir_path(self) -> str: return os.path.join(self.pg_data_dir_path(), "pg_twophase") def config_file_path(self) -> str: - """Path to postgresql.conf""" - return os.path.join(self.pg_data_dir_path(), "postgresql.conf") - - def adjust_for_safekeepers(self, safekeepers: str) -> "Endpoint": - """ - Adjust instance config for working with wal acceptors instead of - pageserver (pre-configured by CLI) directly. - """ - - # TODO: reuse config() - with open(self.config_file_path(), "r") as f: - cfg_lines = f.readlines() - with open(self.config_file_path(), "w") as f: - for cfg_line in cfg_lines: - # walproposer uses different application_name - if ( - "synchronous_standby_names" in cfg_line - or - # don't repeat safekeepers/wal_acceptors multiple times - "neon.safekeepers" in cfg_line - ): - continue - f.write(cfg_line) - f.write("synchronous_standby_names = 'walproposer'\n") - f.write("neon.safekeepers = '{}'\n".format(safekeepers)) - return self + """Path to the postgresql.conf in the endpoint directory (not the one in pgdata)""" + return os.path.join(self.endpoint_path(), "postgresql.conf") def config(self, lines: List[str]) -> "Endpoint": """ @@ -2499,7 +2496,8 @@ def create_start( ep = Endpoint( self.env, tenant_id=tenant_id or self.env.initial_tenant, - port=self.env.port_distributor.get_port(), + pg_port=self.env.port_distributor.get_port(), + http_port=self.env.port_distributor.get_port(), ) self.num_instances += 1 self.endpoints.append(ep) @@ -2524,7 +2522,8 @@ def create( ep = Endpoint( self.env, tenant_id=tenant_id or self.env.initial_tenant, - port=self.env.port_distributor.get_port(), + pg_port=self.env.port_distributor.get_port(), + http_port=self.env.port_distributor.get_port(), ) if endpoint_id is None: @@ -2907,6 +2906,7 @@ def test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Iterator[P "pg_internal.init", "pg.log", "zenith.signal", + "pg_hba.conf", "postgresql.conf", "postmaster.opts", "postmaster.pid", diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index fe8dc293c1f2..2635dbd93c3b 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -383,6 +383,9 @@ def check_neon_works( cli_target = NeonCli(config_target) # And the current binaries to launch computes + snapshot_config["neon_distrib_dir"] = str(neon_current_binpath) + with (snapshot_config_toml).open("w") as f: + toml.dump(snapshot_config, f) config_current = copy.copy(config) config_current.neon_binpath = neon_current_binpath cli_current = NeonCli(config_current) @@ -391,7 +394,8 @@ def check_neon_works( request.addfinalizer(lambda: cli_target.raw_cli(["stop"])) pg_port = port_distributor.get_port() - cli_current.endpoint_start("main", port=pg_port) + http_port = port_distributor.get_port() + cli_current.endpoint_start("main", pg_port=pg_port, http_port=http_port) request.addfinalizer(lambda: cli_current.endpoint_stop("main")) connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres" diff --git a/test_runner/regress/test_compute_ctl.py b/test_runner/regress/test_compute_ctl.py deleted file mode 100644 index d72ffe078d83..000000000000 --- a/test_runner/regress/test_compute_ctl.py +++ /dev/null @@ -1,253 +0,0 @@ -import os -from pathlib import Path -from subprocess import TimeoutExpired - -from fixtures.log_helper import log -from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin - - -# Test that compute_ctl works and prints "--sync-safekeepers" logs. -def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): - neon_env_builder.num_safekeepers = 3 - env = neon_env_builder.init_start() - ctl = ComputeCtl(env) - - env.neon_cli.create_branch("test_compute_ctl", "main") - endpoint = env.endpoints.create_start("test_compute_ctl") - endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)") - - with open(endpoint.config_file_path(), "r") as f: - cfg_lines = f.readlines() - cfg_map = {} - for line in cfg_lines: - if "=" in line: - k, v = line.split("=") - cfg_map[k] = v.strip("\n '\"") - log.info(f"postgres config: {cfg_map}") - pgdata = endpoint.pg_data_dir_path() - pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres") - - endpoint.stop_and_destroy() - - # stop_and_destroy removes the whole endpoint directory. Recreate it. - Path(pgdata).mkdir(parents=True) - - spec = ( - """ -{ - "format_version": 1.0, - - "timestamp": "2021-05-23T18:25:43.511Z", - "operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b", - - "cluster": { - "cluster_id": "test-cluster-42", - "name": "Neon Test", - "state": "restarted", - "roles": [ - ], - "databases": [ - ], - "settings": [ - { - "name": "fsync", - "value": "off", - "vartype": "bool" - }, - { - "name": "wal_level", - "value": "replica", - "vartype": "enum" - }, - { - "name": "neon.safekeepers", - "value": """ - + f'"{cfg_map["neon.safekeepers"]}"' - + """, - "vartype": "string" - }, - { - "name": "wal_log_hints", - "value": "on", - "vartype": "bool" - }, - { - "name": "log_connections", - "value": "on", - "vartype": "bool" - }, - { - "name": "shared_buffers", - "value": "32768", - "vartype": "integer" - }, - { - "name": "port", - "value": """ - + f'"{cfg_map["port"]}"' - + """, - "vartype": "integer" - }, - { - "name": "max_connections", - "value": "100", - "vartype": "integer" - }, - { - "name": "max_wal_senders", - "value": "10", - "vartype": "integer" - }, - { - "name": "listen_addresses", - "value": "0.0.0.0", - "vartype": "string" - }, - { - "name": "wal_sender_timeout", - "value": "0", - "vartype": "integer" - }, - { - "name": "password_encryption", - "value": "md5", - "vartype": "enum" - }, - { - "name": "maintenance_work_mem", - "value": "65536", - "vartype": "integer" - }, - { - "name": "max_parallel_workers", - "value": "8", - "vartype": "integer" - }, - { - "name": "max_worker_processes", - "value": "8", - "vartype": "integer" - }, - { - "name": "neon.tenant_id", - "value": """ - + f'"{cfg_map["neon.tenant_id"]}"' - + """, - "vartype": "string" - }, - { - "name": "max_replication_slots", - "value": "10", - "vartype": "integer" - }, - { - "name": "neon.timeline_id", - "value": """ - + f'"{cfg_map["neon.timeline_id"]}"' - + """, - "vartype": "string" - }, - { - "name": "shared_preload_libraries", - "value": "neon", - "vartype": "string" - }, - { - "name": "synchronous_standby_names", - "value": "walproposer", - "vartype": "string" - }, - { - "name": "neon.pageserver_connstring", - "value": """ - + f'"{cfg_map["neon.pageserver_connstring"]}"' - + """, - "vartype": "string" - } - ] - }, - "delta_operations": [ - ] -} -""" - ) - - ps_connstr = cfg_map["neon.pageserver_connstring"] - log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}") - - # run compute_ctl and wait for 10s - try: - ctl.raw_cli( - [ - "--connstr", - "postgres://invalid/", - "--pgdata", - pgdata, - "--spec", - spec, - "--pgbin", - pg_bin_path, - ], - timeout=10, - ) - except TimeoutExpired as exc: - ctl_logs = (exc.stderr or b"").decode("utf-8") - log.info(f"compute_ctl stderr:\n{ctl_logs}") - - with ExternalProcessManager(Path(pgdata) / "postmaster.pid"): - start = "starting safekeepers syncing" - end = "safekeepers synced at LSN" - start_pos = ctl_logs.index(start) - assert start_pos != -1 - end_pos = ctl_logs.index(end, start_pos) - assert end_pos != -1 - sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)] - log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs) - - # assert that --sync-safekeepers logs are present in the output - assert "connecting with node" in sync_safekeepers_logs - assert "connected with node" in sync_safekeepers_logs - assert "proposer connected to quorum (2)" in sync_safekeepers_logs - assert "got votes from majority (2)" in sync_safekeepers_logs - assert "sending elected msg to node" in sync_safekeepers_logs - - -class ExternalProcessManager: - """ - Context manager that kills a process with a pid file on exit. - """ - - def __init__(self, pid_file: Path): - self.path = pid_file - self.pid_file = open(pid_file, "r") - self.pid = int(self.pid_file.readline().strip()) - - def __enter__(self): - return self - - def leave_alive(self): - self.pid_file.close() - - def __exit__(self, _type, _value, _traceback): - import signal - import time - - if self.pid_file.closed: - return - - with self.pid_file: - try: - os.kill(self.pid, signal.SIGTERM) - except OSError as e: - if not self.path.is_file(): - return - log.info(f"Failed to kill {self.pid}, but the pidfile remains: {e}") - return - - for _ in range(20): - if not self.path.is_file(): - return - time.sleep(0.2) - - log.info("Process failed to stop after SIGTERM: {self.pid}") - os.kill(self.pid, signal.SIGKILL) diff --git a/test_runner/regress/test_neon_local_cli.py b/test_runner/regress/test_neon_local_cli.py index f6629c54f9b9..3314e7fbf65e 100644 --- a/test_runner/regress/test_neon_local_cli.py +++ b/test_runner/regress/test_neon_local_cli.py @@ -9,11 +9,18 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por try: env.neon_cli.start() env.neon_cli.create_tenant(tenant_id=env.initial_tenant, set_default=True) - env.neon_cli.endpoint_start(endpoint_id="ep-main", port=port_distributor.get_port()) + + pg_port = port_distributor.get_port() + http_port = port_distributor.get_port() + env.neon_cli.endpoint_start( + endpoint_id="ep-basic-main", pg_port=pg_port, http_port=http_port + ) env.neon_cli.create_branch(new_branch_name="migration_check") + pg_port = port_distributor.get_port() + http_port = port_distributor.get_port() env.neon_cli.endpoint_start( - endpoint_id="ep-migration_check", port=port_distributor.get_port() + endpoint_id="ep-migration_check", pg_port=pg_port, http_port=http_port ) finally: env.neon_cli.stop() diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index f5e0e34bc9a4..acba204368e6 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -59,6 +59,13 @@ def test_tenant_reattach( # create new nenant tenant_id, timeline_id = env.neon_cli.create_tenant() + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: with endpoint.cursor() as cur: cur.execute("CREATE TABLE t(key int primary key, value text)") @@ -223,13 +230,6 @@ def test_tenant_reattach_while_busy( ) env = neon_env_builder.init_start() - # Attempts to connect from compute to pageserver while the tenant is - # temporarily detached produces these errors in the pageserver log. - env.pageserver.allowed_errors.append(".*Tenant .* not found.*") - env.pageserver.allowed_errors.append( - ".*Tenant .* will not become active\\. Current state: Stopping.*" - ) - pageserver_http = env.pageserver.http_client() # create new nenant @@ -238,6 +238,13 @@ def test_tenant_reattach_while_busy( conf={"checkpoint_distance": "100000"} ) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) cur = endpoint.connect().cursor() @@ -275,6 +282,13 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): # create new nenant tenant_id, timeline_id = env.neon_cli.create_tenant() + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + # assert tenant exists on disk assert (env.repo_dir / "tenants" / str(tenant_id)).exists() @@ -336,6 +350,13 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv): # create a new tenant tenant_id, _ = env.neon_cli.create_tenant() + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + # assert tenant exists on disk assert (env.repo_dir / "tenants" / str(tenant_id)).exists() @@ -385,6 +406,13 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv): # create a new tenant tenant_id, _ = env.neon_cli.create_tenant() + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + # assert tenant exists on disk assert (env.repo_dir / "tenants" / str(tenant_id)).exists() @@ -399,6 +427,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv): log.info("detaching regular tenant with detach ignored flag") client.tenant_detach(tenant_id, True) + log.info("regular tenant detached without error") # check that nothing is left on disk for deleted tenant @@ -432,6 +461,13 @@ def test_detach_while_attaching( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + # Create table, and insert some rows. Make it big enough that it doesn't fit in # shared_buffers, otherwise the SELECT after restart will just return answer # from shared_buffers without hitting the page server, which defeats the point @@ -577,6 +613,13 @@ def test_ignored_tenant_download_missing_layers( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + data_id = 1 data_secret = "very secret secret" insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint) @@ -636,6 +679,13 @@ def test_ignored_tenant_stays_broken_without_metadata( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + # ignore the tenant and remove its metadata pageserver_http.tenant_ignore(tenant_id) tenant_timeline_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) @@ -672,6 +722,13 @@ def test_load_attach_negatives( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*") with pytest.raises( expected_exception=PageserverApiException, @@ -714,6 +771,13 @@ def test_ignore_while_attaching( tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + # Attempts to connect from compute to pageserver while the tenant is + # temporarily detached produces these errors in the pageserver log. + env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") + env.pageserver.allowed_errors.append( + f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + ) + data_id = 1 data_secret = "very secret secret" insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint) diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index 60ab268882c0..555096e26810 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -332,7 +332,7 @@ def test_single_branch_get_tenant_size_grows( # inserts is larger than gc_horizon. for example 0x20000 here hid the fact # that there next_gc_cutoff could be smaller than initdb_lsn, which will # obviously lead to issues when calculating the size. - gc_horizon = 0x38000 + gc_horizon = 0x40000 neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}" env = neon_env_builder.init_start() diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 15712b9e5506..aef2df49321c 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -267,6 +267,7 @@ def test_pageserver_metrics_removed_after_detach( cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") cur.execute("SELECT sum(key) FROM t") assert cur.fetchone() == (5000050000,) + endpoint.stop() def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]: ps_metrics = env.pageserver.http_client().get_metrics() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 2a4141ed30be..8b595596cb94 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1001,9 +1001,6 @@ def test_safekeeper_without_pageserver( def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder): - def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str: - return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names]) - def execute_payload(endpoint: Endpoint): with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -1032,9 +1029,8 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i log.info("Use only first 3 safekeepers") env.safekeepers[3].stop() - active_safekeepers = [1, 2, 3] endpoint = env.endpoints.create("test_replace_safekeeper") - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.active_safekeepers = [1, 2, 3] endpoint.start() # learn neon timeline from compute @@ -1072,9 +1068,8 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i log.info("Recreate postgres to replace failed sk1 with new sk4") endpoint.stop_and_destroy().create("test_replace_safekeeper") - active_safekeepers = [2, 3, 4] env.safekeepers[3].start() - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.active_safekeepers = [2, 3, 4] endpoint.start() execute_payload(endpoint) @@ -1293,9 +1288,8 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i log.info("Use only first 3 safekeepers") env.safekeepers[3].stop() - active_safekeepers = [1, 2, 3] endpoint = env.endpoints.create("test_pull_timeline") - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.active_safekeepers = [1, 2, 3] endpoint.start() # learn neon timeline from compute @@ -1332,10 +1326,8 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i show_statuses(env.safekeepers, tenant_id, timeline_id) log.info("Restarting compute with new config to verify that it works") - active_safekeepers = [1, 3, 4] - endpoint.stop_and_destroy().create("test_pull_timeline") - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.active_safekeepers = [1, 3, 4] endpoint.start() execute_payload(endpoint) diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 7debeed1406e..ce33975a0e7b 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -2,9 +2,11 @@ import random import time from dataclasses import dataclass +from pathlib import Path from typing import List, Optional import asyncpg +import toml from fixtures.log_helper import getLogger from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper from fixtures.types import Lsn, TenantId, TimelineId @@ -251,7 +253,8 @@ def endpoint_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]): endpoint = Endpoint( env, tenant_id=env.initial_tenant, - port=env.port_distributor.get_port(), + pg_port=env.port_distributor.get_port(), + http_port=env.port_distributor.get_port(), # In these tests compute has high probability of terminating on its own # before our stop() due to lost consensus leadership. check_stop_result=False, @@ -536,15 +539,20 @@ def test_race_conditions(neon_env_builder: NeonEnvBuilder): # Check that pageserver can select safekeeper with largest commit_lsn # and switch if LSN is not updated for some time (NoWalTimeout). -async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint): - def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str: - # use ports 10, 11 and 12 to simulate unavailable safekeepers - return ",".join( - [ - f"localhost:{sk.port.pg if active else 10 + i}" - for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk)) - ] - ) +async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Path): + def adjust_safekeepers(env: NeonEnv, active_sk: List[bool]): + # Change the pg ports of the inactive safekeepers in the config file to be + # invalid, to make them unavailable to the endpoint. We use + # ports 10, 11 and 12 to simulate unavailable safekeepers. + config = toml.load(test_output_dir / "repo" / "config") + for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk)): + if active: + config["safekeepers"][i]["pg_port"] = env.safekeepers[i].port.pg + else: + config["safekeepers"][i]["pg_port"] = 10 + i + + with open(test_output_dir / "repo" / "config", "w") as f: + toml.dump(config, f) conn = await endpoint.connect_async() await conn.execute("CREATE TABLE t(key int primary key, value text)") @@ -565,7 +573,7 @@ def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str: it -= 1 continue - endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_sk)) + adjust_safekeepers(env, active_sk) log.info(f"Iteration {it}: {active_sk}") endpoint.start() @@ -579,7 +587,7 @@ def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str: await conn.close() endpoint.stop() - endpoint.adjust_for_safekeepers(safekeepers_guc(env, [True] * len(env.safekeepers))) + adjust_safekeepers(env, [True] * len(env.safekeepers)) endpoint.start() conn = await endpoint.connect_async() @@ -590,11 +598,11 @@ def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str: # do inserts while restarting postgres and messing with safekeeper addresses -def test_wal_lagging(neon_env_builder: NeonEnvBuilder): +def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path): neon_env_builder.num_safekeepers = 3 env = neon_env_builder.init_start() env.neon_cli.create_branch("test_wal_lagging") endpoint = env.endpoints.create_start("test_wal_lagging") - asyncio.run(run_wal_lagging(env, endpoint)) + asyncio.run(run_wal_lagging(env, endpoint, test_output_dir)) diff --git a/test_runner/regress/test_walredo_not_left_behind_on_detach.py b/test_runner/regress/test_walredo_not_left_behind_on_detach.py index 7d944bebb3cb..4a478989356a 100644 --- a/test_runner/regress/test_walredo_not_left_behind_on_detach.py +++ b/test_runner/regress/test_walredo_not_left_behind_on_detach.py @@ -83,6 +83,9 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder): # XXX this is quite brittle as the lifecycle of the WAL redo process is an implementation detail assert_child_processes(pagserver_pid, wal_redo_present=True, defunct_present=False) + # Stop the compute before detaching, to avoid errors in the log. + endpoint.stop() + last_error = None for i in range(3): try: From cae9a69f899ddad3a97a189128e128172bab6a44 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Thu, 1 Jun 2023 18:31:44 +0100 Subject: [PATCH 04/12] Remove obsolete comment about PostgresNode. It was renamed to Endpoint and refactored to reduce code duplication. --- compute_tools/src/compute.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index e5d189268187..4a297dcb6ef2 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,19 +1,3 @@ -// -// XXX: This starts to be scarry similar to the `PostgresNode` from `control_plane`, -// but there are several things that makes `PostgresNode` usage inconvenient in the -// cloud: -// - it inherits from `LocalEnv`, which contains **all-all** the information about -// a complete service running -// - it uses `PageServerNode` with information about http endpoint, which we do not -// need in the cloud again -// - many tiny pieces like, for example, we do not use `pg_ctl` in the cloud -// -// Thus, to use `PostgresNode` in the cloud, we need to 'mock' a bunch of required -// attributes (not required for the cloud). Yet, it is still tempting to unify these -// `PostgresNode` and `ComputeNode` and use one in both places. -// -// TODO: stabilize `ComputeNode` and think about using it in the `control_plane`. -// use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::Path; From 44c2c79d26d44c414632a8b70b4a8abc948cd278 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Thu, 1 Jun 2023 20:49:35 +0100 Subject: [PATCH 05/12] WIP bring back replication settings --- control_plane/src/endpoint.rs | 104 +++++++++++++++++++++++++++------- 1 file changed, 82 insertions(+), 22 deletions(-) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index a4ee2f3a711e..e245d2107ec5 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -235,11 +235,6 @@ impl Endpoint { conf.append("wal_log_hints", "off"); conf.append("max_replication_slots", "10"); conf.append("hot_standby", "on"); - // prefetching of blocks referenced in WAL doesn't make sense for us - // Neon hot standby ignores pages that are not in the shared_buffers - if self.pg_version >= 15 { - conf.append("recovery_prefetch", "off"); - } conf.append("shared_buffers", "1MB"); conf.append("fsync", "off"); conf.append("max_connections", "100"); @@ -255,24 +250,89 @@ impl Endpoint { // Load the 'neon' extension conf.append("shared_preload_libraries", "neon"); - conf.append_line(""); - // Configure backpressure - // - Replication write lag depends on how fast the walreceiver can process incoming WAL. - // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, - // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. - // Actually latency should be much smaller (better if < 1sec). But we assume that recently - // updates pages are not requested from pageserver. - // - Replication flush lag depends on speed of persisting data by checkpointer (creation of - // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to - // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long - // recovery time (in case of pageserver crash) and disk space overflow at safekeepers. - // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. - // To be able to restore database in case of pageserver node crash, safekeeper should not - // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers - // (if they are not able to upload WAL to S3). - conf.append("max_replication_write_lag", "15MB"); - conf.append("max_replication_flush_lag", "10GB"); + conf.append_line(""); + // Replication-related configurations, such as WAL sending + match &self.mode { + ComputeMode::Primary => { + // Configure backpressure + // - Replication write lag depends on how fast the walreceiver can process incoming WAL. + // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, + // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. + // Actually latency should be much smaller (better if < 1sec). But we assume that recently + // updates pages are not requested from pageserver. + // - Replication flush lag depends on speed of persisting data by checkpointer (creation of + // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to + // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long + // recovery time (in case of pageserver crash) and disk space overflow at safekeepers. + // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. + // To be able to restore database in case of pageserver node crash, safekeeper should not + // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers + // (if they are not able to upload WAL to S3). + conf.append("max_replication_write_lag", "15MB"); + conf.append("max_replication_flush_lag", "10GB"); + + if !self.env.safekeepers.is_empty() { + // Configure Postgres to connect to the safekeepers + conf.append("synchronous_standby_names", "walproposer"); + + let safekeepers = self + .env + .safekeepers + .iter() + .map(|sk| format!("localhost:{}", sk.pg_port)) + .collect::>() + .join(","); + conf.append("neon.safekeepers", &safekeepers); + } else { + // We only use setup without safekeepers for tests, + // and don't care about data durability on pageserver, + // so set more relaxed synchronous_commit. + conf.append("synchronous_commit", "remote_write"); + + // Configure the node to stream WAL directly to the pageserver + // This isn't really a supported configuration, but can be useful for + // testing. + conf.append("synchronous_standby_names", "pageserver"); + } + } + ComputeMode::Static(lsn) => { + conf.append("recovery_target_lsn", &lsn.to_string()); + } + ComputeMode::Replica => { + assert!(!self.env.safekeepers.is_empty()); + + // TODO: use future host field from safekeeper spec + // Pass the list of safekeepers to the replica so that it can connect to any of them, + // whichever is availiable. + let sk_ports = self + .env + .safekeepers + .iter() + .map(|x| x.pg_port.to_string()) + .collect::>() + .join(","); + let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(","); + + let connstr = format!( + "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true", + sk_hosts, + sk_ports, + &self.timeline_id.to_string(), + &self.tenant_id.to_string(), + ); + + let slot_name = format!("repl_{}_", self.timeline_id); + conf.append("primary_conninfo", connstr.as_str()); + conf.append("primary_slot_name", slot_name.as_str()); + conf.append("hot_standby", "on"); + // prefetching of blocks referenced in WAL doesn't make sense for us + // Neon hot standby ignores pages that are not in the shared_buffers + if self.pg_version >= 15 { + conf.append("recovery_prefetch", "off"); + } + } + } if !self.env.safekeepers.is_empty() { // Configure Postgres to connect to the safekeepers From 28336cd9fa2b65ea2c99f0a46f21a03a396ed45b Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Thu, 1 Jun 2023 21:53:48 +0100 Subject: [PATCH 06/12] WIP only set safekeeper_connstrings for Primary --- control_plane/src/endpoint.rs | 33 ++++++++++----------------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index e245d2107ec5..4f6f58ef4405 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -334,21 +334,6 @@ impl Endpoint { } } - if !self.env.safekeepers.is_empty() { - // Configure Postgres to connect to the safekeepers - conf.append("synchronous_standby_names", "walproposer"); - } else { - // We only use setup without safekeepers for tests, - // and don't care about data durability on pageserver, - // so set more relaxed synchronous_commit. - conf.append("synchronous_commit", "remote_write"); - - // Configure the node to stream WAL directly to the pageserver - // This isn't really a supported configuration, but can be useful for - // testing. - conf.append("synchronous_standby_names", "pageserver"); - } - Ok(conf) } @@ -451,14 +436,16 @@ impl Endpoint { format!("postgresql://no_user@{host}:{port}") }; let mut safekeeper_connstrings = Vec::new(); - for sk_id in safekeepers { - let sk = self - .env - .safekeepers - .iter() - .find(|node| node.id == sk_id) - .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?; - safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.pg_port)); + if self.mode == ComputeMode::Primary && safekeepers.is_empty() { + for sk_id in safekeepers { + let sk = self + .env + .safekeepers + .iter() + .find(|node| node.id == sk_id) + .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?; + safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.pg_port)); + } } // Create spec file From a8f4b5def3203bba19ab573c36de29a5758b287a Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Thu, 1 Jun 2023 22:08:29 +0100 Subject: [PATCH 07/12] WIP Create standby.signal file for Static compute node --- compute_tools/src/compute.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 4a297dcb6ef2..617b330704d7 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -291,8 +291,8 @@ impl ComputeNode { update_pg_hba(pgdata_path)?; match spec.mode { - ComputeMode::Primary | ComputeMode::Static(..) => {} - ComputeMode::Replica => { + ComputeMode::Primary => {} + ComputeMode::Replica | ComputeMode::Static(..) => { add_standby_signal(pgdata_path)?; } } From ef9557ec935c6c9daf4a74b6ecc4eae91ab85435 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Mon, 5 Jun 2023 14:50:59 +0100 Subject: [PATCH 08/12] Fix err parsing in test_pageserver_lsn_wait_error_safekeeper_stop --- test_runner/regress/test_wal_receiver.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 8e4e154be1fd..515d47c079ee 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -77,7 +77,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil try: trigger_wait_lsn_timeout(env, tenant_id) except Exception as e: - exception_string = str(e) + # Strip out the part before stdout, as it contains full command with the list of all safekeepers + exception_string = str(e).split("stdout", 1)[-1] assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" for safekeeper in env.safekeepers: From 6475c7c3c1c1b86117587dcc008caaec761f685f Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Mon, 5 Jun 2023 18:08:46 +0100 Subject: [PATCH 09/12] fix for only set safekeeper_connstring --- control_plane/src/endpoint.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 4f6f58ef4405..b28315a35deb 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -436,7 +436,7 @@ impl Endpoint { format!("postgresql://no_user@{host}:{port}") }; let mut safekeeper_connstrings = Vec::new(); - if self.mode == ComputeMode::Primary && safekeepers.is_empty() { + if self.mode == ComputeMode::Primary { for sk_id in safekeepers { let sk = self .env From 99d27fa9378e5cd337148d7956f33f0556053aa1 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Mon, 5 Jun 2023 20:37:59 +0100 Subject: [PATCH 10/12] Fix allowed error message in test_ignored_tenant_stays_broken_without_metadata --- test_runner/regress/test_tenant_detach.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index acba204368e6..9d0fdcfaf8eb 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -683,7 +683,7 @@ def test_ignored_tenant_stays_broken_without_metadata( # temporarily detached produces these errors in the pageserver log. env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*") env.pageserver.allowed_errors.append( - f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" + f".*Tenant {tenant_id} will not become active\\. Current state: Broken.*" ) # ignore the tenant and remove its metadata From 177e02caf4cebe40218f4d65b8af9ebdc399458c Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Mon, 5 Jun 2023 21:35:53 +0100 Subject: [PATCH 11/12] fix flaky test_single_branch_get_tenant_size_grows on v15 --- test_runner/regress/test_tenant_size.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index 555096e26810..f121451a2b88 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -318,7 +318,8 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa def test_single_branch_get_tenant_size_grows( - neon_env_builder: NeonEnvBuilder, test_output_dir: Path + neon_env_builder: NeonEnvBuilder, test_output_dir: Path, + pg_version: PgVersion ): """ Operate on single branch reading the tenants size after each transaction. @@ -332,7 +333,14 @@ def test_single_branch_get_tenant_size_grows( # inserts is larger than gc_horizon. for example 0x20000 here hid the fact # that there next_gc_cutoff could be smaller than initdb_lsn, which will # obviously lead to issues when calculating the size. - gc_horizon = 0x40000 + gc_horizon=0x38000 + + # it's a bit of a hack, but different versions of postgres have different + # amount of WAL generated for the same amount of data. so we need to + # adjust the gc_horizon accordingly. + if pg_version == PgVersion.V14: + gc_horizon = 0x40000 + neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}" env = neon_env_builder.init_start() From 1282305d667e075d36cbc85a9231df6803336109 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Tue, 6 Jun 2023 11:41:40 +0100 Subject: [PATCH 12/12] pytest codestyle fix --- test_runner/regress/test_tenant_size.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index f121451a2b88..e9dcd1e5cdee 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -318,8 +318,7 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa def test_single_branch_get_tenant_size_grows( - neon_env_builder: NeonEnvBuilder, test_output_dir: Path, - pg_version: PgVersion + neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_version: PgVersion ): """ Operate on single branch reading the tenants size after each transaction. @@ -333,7 +332,7 @@ def test_single_branch_get_tenant_size_grows( # inserts is larger than gc_horizon. for example 0x20000 here hid the fact # that there next_gc_cutoff could be smaller than initdb_lsn, which will # obviously lead to issues when calculating the size. - gc_horizon=0x38000 + gc_horizon = 0x38000 # it's a bit of a hack, but different versions of postgres have different # amount of WAL generated for the same amount of data. so we need to