From f0b2e076d9beb3601049388dc2d7d94ef7b68f23 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sun, 9 Apr 2023 21:52:28 +0300 Subject: [PATCH] Move compute_ctl structs used in HTTP API and spec file to separate crate. This is in preparation of using compute_ctl to launch postgres nodes in the neon_local control plane. And seems like a good idea to separate the public interfaces anyway. One non-mechanical change here is that the 'metrics' field is moved under the Mutex, instead of using atomics. We were not using atomics for performance but for convenience here, and it seems more clear to not use atomics in the model for the HTTP response type. --- Cargo.lock | 13 +++ Cargo.toml | 1 + compute_tools/Cargo.toml | 1 + compute_tools/src/bin/compute_ctl.rs | 5 +- compute_tools/src/compute.rs | 81 +++++----------- compute_tools/src/config.rs | 2 +- compute_tools/src/http/api.rs | 26 +++-- compute_tools/src/http/mod.rs | 2 - compute_tools/src/http/responses.rs | 40 -------- compute_tools/src/pg_helpers.rs | 78 +++++---------- compute_tools/src/spec.rs | 43 +-------- compute_tools/tests/pg_helpers_tests.rs | 7 +- libs/compute_api/Cargo.toml | 14 +++ libs/compute_api/src/lib.rs | 3 + .../http => libs/compute_api/src}/requests.rs | 5 +- libs/compute_api/src/responses.rs | 66 +++++++++++++ libs/compute_api/src/spec.rs | 94 +++++++++++++++++++ .../compute_api}/tests/cluster_spec.json | 0 18 files changed, 271 insertions(+), 210 deletions(-) delete mode 100644 compute_tools/src/http/responses.rs create mode 100644 libs/compute_api/Cargo.toml create mode 100644 libs/compute_api/src/lib.rs rename {compute_tools/src/http => libs/compute_api/src}/requests.rs (76%) create mode 100644 libs/compute_api/src/responses.rs create mode 100644 libs/compute_api/src/spec.rs rename {compute_tools => libs/compute_api}/tests/cluster_spec.json (100%) diff --git a/Cargo.lock b/Cargo.lock index 4590e7601417..c5b64b235a37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,6 +841,18 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "compute_api" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "serde", + "serde_json", + "serde_with", + "workspace_hack", +] + [[package]] name = "compute_tools" version = "0.1.0" @@ -848,6 +860,7 @@ dependencies = [ "anyhow", "chrono", "clap 4.1.4", + "compute_api", "futures", "hyper", "notify", diff --git a/Cargo.toml b/Cargo.toml index 09cc150606fc..d563324c8605 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,6 +132,7 @@ tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df6 heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending ## Local libraries +compute_api = { version = "0.1", path = "./libs/compute_api/" } consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } metrics = { version = "0.1", path = "./libs/metrics/" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 59433535f1e0..f315d2b7d9aa 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -27,4 +27,5 @@ tracing-subscriber.workspace = true tracing-utils.workspace = true url.workspace = true +compute_api.workspace = true workspace_hack.workspace = true diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 1a3ac77af4cf..d61eae5f7a18 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -43,7 +43,9 @@ use clap::Arg; use tracing::{error, info}; use url::Url; -use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus}; +use compute_api::responses::ComputeStatus; + +use compute_tools::compute::{ComputeNode, ComputeState}; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -116,7 +118,6 @@ fn main() -> Result<()> { pgdata: pgdata.to_string(), pgbin: pgbin.to_string(), live_config_allowed, - metrics: ComputeMetrics::default(), state: Mutex::new(new_state), state_changed: Condvar::new(), }; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 3e92ec57dcd7..689aa6ef43b7 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -19,16 +19,17 @@ use std::os::unix::fs::PermissionsExt; use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Condvar, Mutex}; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use postgres::{Client, NoTls}; -use serde::Serialize; use tokio_postgres; use tracing::{info, instrument, warn}; +use compute_api::responses::{ComputeMetrics, ComputeStatus}; +use compute_api::spec::ComputeSpec; + use crate::checker::create_writability_check_data; use crate::config; use crate::pg_helpers::*; @@ -41,7 +42,6 @@ pub struct ComputeNode { pub connstr: url::Url, pub pgdata: String, pub pgbin: String, - pub metrics: ComputeMetrics, /// We should only allow live re- / configuration of the compute node if /// it uses 'pull model', i.e. it can go to control-plane and fetch /// the latest configuration. Otherwise, there could be a case: @@ -74,6 +74,8 @@ pub struct ComputeState { pub timeline: String, pub pageserver_connstr: String, pub storage_auth_token: Option, + + pub metrics: ComputeMetrics, } impl ComputeState { @@ -87,6 +89,7 @@ impl ComputeState { timeline: String::new(), pageserver_connstr: String::new(), storage_auth_token: None, + metrics: ComputeMetrics::default(), } } } @@ -97,33 +100,6 @@ impl Default for ComputeState { } } -#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)] -#[serde(rename_all = "snake_case")] -pub enum ComputeStatus { - // Spec wasn't provided at start, waiting for it to be - // provided by control-plane. - Empty, - // Compute configuration was requested. - ConfigurationPending, - // Compute node has spec and initial startup and - // configuration is in progress. - Init, - // Compute is configured and running. - Running, - // Either startup or configuration failed, - // compute will exit soon or is waiting for - // control-plane to terminate it. - Failed, -} - -#[derive(Default, Serialize)] -pub struct ComputeMetrics { - pub sync_safekeepers_ms: AtomicU64, - pub basebackup_ms: AtomicU64, - pub config_ms: AtomicU64, - pub total_startup_ms: AtomicU64, -} - impl ComputeNode { pub fn set_status(&self, status: ComputeStatus) { let mut state = self.state.lock().unwrap(); @@ -185,15 +161,11 @@ impl ComputeNode { ar.set_ignore_zeros(true); ar.unpack(&self.pgdata)?; - self.metrics.basebackup_ms.store( - Utc::now() - .signed_duration_since(start_time) - .to_std() - .unwrap() - .as_millis() as u64, - Ordering::Relaxed, - ); - + self.state.lock().unwrap().metrics.basebackup_ms = Utc::now() + .signed_duration_since(start_time) + .to_std() + .unwrap() + .as_millis() as u64; Ok(()) } @@ -231,14 +203,11 @@ impl ComputeNode { ); } - self.metrics.sync_safekeepers_ms.store( - Utc::now() - .signed_duration_since(start_time) - .to_std() - .unwrap() - .as_millis() as u64, - Ordering::Relaxed, - ); + self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now() + .signed_duration_since(start_time) + .to_std() + .unwrap() + .as_millis() as u64; let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim()); @@ -375,23 +344,19 @@ impl ComputeNode { self.apply_config(&compute_state)?; let startup_end_time = Utc::now(); - self.metrics.config_ms.store( - startup_end_time + { + let mut state = self.state.lock().unwrap(); + state.metrics.config_ms = startup_end_time .signed_duration_since(start_time) .to_std() .unwrap() - .as_millis() as u64, - Ordering::Relaxed, - ); - self.metrics.total_startup_ms.store( - startup_end_time + .as_millis() as u64; + state.metrics.total_startup_ms = startup_end_time .signed_duration_since(self.start_time) .to_std() .unwrap() - .as_millis() as u64, - Ordering::Relaxed, - ); - + .as_millis() as u64; + } self.set_status(ComputeStatus::Running); Ok(pg) diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 6cbd0e3d4c25..d25eb9b2fce9 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -6,7 +6,7 @@ use std::path::Path; use anyhow::Result; use crate::pg_helpers::PgOptionsSerialize; -use crate::spec::ComputeSpec; +use compute_api::spec::ComputeSpec; /// Check that `line` is inside a text file and put it there if it is not. /// Create file if it doesn't exist. diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 8620b10636c1..cea45dc596e4 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -3,9 +3,9 @@ use std::net::SocketAddr; use std::sync::Arc; use std::thread; -use crate::compute::{ComputeNode, ComputeStatus}; -use crate::http::requests::ConfigurationRequest; -use crate::http::responses::{ComputeStatusResponse, GenericAPIError}; +use crate::compute::{ComputeNode, ComputeState}; +use compute_api::requests::ConfigurationRequest; +use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError}; use anyhow::Result; use hyper::service::{make_service_fn, service_fn}; @@ -16,6 +16,16 @@ use tokio::task; use tracing::{error, info}; use tracing_utils::http::OtelName; +fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { + ComputeStatusResponse { + tenant: state.tenant.clone(), + timeline: state.timeline.clone(), + status: state.status, + last_active: state.last_active, + error: state.error.clone(), + } +} + // Service function to handle all available routes. async fn routes(req: Request, compute: &Arc) -> Response { // @@ -28,8 +38,7 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving /status GET request"); let state = compute.state.lock().unwrap(); - let status_response = ComputeStatusResponse::from(state.clone()); - + let status_response = status_response_from_state(&state); Response::new(Body::from(serde_json::to_string(&status_response).unwrap())) } @@ -37,7 +46,8 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving /metrics.json GET request"); - Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap())) + let metrics = compute.state.lock().unwrap().metrics.clone(); + Response::new(Body::from(serde_json::to_string(&metrics).unwrap())) } // Collect Postgres current usage insights @@ -162,7 +172,7 @@ async fn handle_configure_request( ); if state.status == ComputeStatus::Failed { - let err = state.error.clone().unwrap_or("unknown error".to_string()); + let err = state.error.as_ref().map_or("unknown error", |x| x); let msg = format!("compute configuration failed: {:?}", err); return Err((msg, StatusCode::INTERNAL_SERVER_ERROR)); } @@ -175,7 +185,7 @@ async fn handle_configure_request( // Return current compute state if everything went well. let state = compute.state.lock().unwrap().clone(); - let status_response = ComputeStatusResponse::from(state); + let status_response = status_response_from_state(&state); Ok(serde_json::to_string(&status_response).unwrap()) } else { Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST)) diff --git a/compute_tools/src/http/mod.rs b/compute_tools/src/http/mod.rs index e54b4e334145..e5fdf85eed77 100644 --- a/compute_tools/src/http/mod.rs +++ b/compute_tools/src/http/mod.rs @@ -1,3 +1 @@ pub mod api; -pub mod requests; -pub mod responses; diff --git a/compute_tools/src/http/responses.rs b/compute_tools/src/http/responses.rs deleted file mode 100644 index 1ef4b380a9c4..000000000000 --- a/compute_tools/src/http/responses.rs +++ /dev/null @@ -1,40 +0,0 @@ -use serde::{Serialize, Serializer}; - -use chrono::{DateTime, Utc}; - -use crate::compute::{ComputeState, ComputeStatus}; - -#[derive(Serialize, Debug)] -pub struct GenericAPIError { - pub error: String, -} - -#[derive(Serialize, Debug)] -#[serde(rename_all = "snake_case")] -pub struct ComputeStatusResponse { - pub tenant: String, - pub timeline: String, - pub status: ComputeStatus, - #[serde(serialize_with = "rfc3339_serialize")] - pub last_active: DateTime, - pub error: Option, -} - -impl From for ComputeStatusResponse { - fn from(state: ComputeState) -> Self { - ComputeStatusResponse { - tenant: state.tenant, - timeline: state.timeline, - status: state.status, - last_active: state.last_active, - error: state.error, - } - } -} - -fn rfc3339_serialize(x: &DateTime, s: S) -> Result -where - S: Serializer, -{ - x.to_rfc3339().serialize(s) -} diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 38d1a6d7776d..bb787d0506c4 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -10,42 +10,11 @@ use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use notify::{RecursiveMode, Watcher}; use postgres::{Client, Transaction}; -use serde::Deserialize; use tracing::{debug, instrument}; -const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds - -/// Rust representation of Postgres role info with only those fields -/// that matter for us. -#[derive(Clone, Deserialize, Debug)] -pub struct Role { - pub name: PgIdent, - pub encrypted_password: Option, - pub options: GenericOptions, -} - -/// Rust representation of Postgres database info with only those fields -/// that matter for us. -#[derive(Clone, Deserialize, Debug)] -pub struct Database { - pub name: PgIdent, - pub owner: PgIdent, - pub options: GenericOptions, -} +use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role}; -/// Common type representing both SQL statement params with or without value, -/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config -/// options like `wal_level = logical`. -#[derive(Clone, Deserialize, Debug)] -pub struct GenericOption { - pub name: String, - pub value: Option, - pub vartype: String, -} - -/// Optional collection of `GenericOption`'s. Type alias allows us to -/// declare a `trait` on it. -pub type GenericOptions = Option>; +const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds /// Escape a string for including it in a SQL literal fn escape_literal(s: &str) -> String { @@ -58,9 +27,14 @@ fn escape_conf_value(s: &str) -> String { s.replace('\'', "''").replace('\\', "\\\\") } -impl GenericOption { +trait GenericOptionExt { + fn to_pg_option(&self) -> String; + fn to_pg_setting(&self) -> String; +} + +impl GenericOptionExt for GenericOption { /// Represent `GenericOption` as SQL statement parameter. - pub fn to_pg_option(&self) -> String { + fn to_pg_option(&self) -> String { if let Some(val) = &self.value { match self.vartype.as_ref() { "string" => format!("{} '{}'", self.name, escape_literal(val)), @@ -72,7 +46,7 @@ impl GenericOption { } /// Represent `GenericOption` as configuration option. - pub fn to_pg_setting(&self) -> String { + fn to_pg_setting(&self) -> String { if let Some(val) = &self.value { match self.vartype.as_ref() { "string" => format!("{} = '{}'", self.name, escape_conf_value(val)), @@ -131,10 +105,14 @@ impl GenericOptionsSearch for GenericOptions { } } -impl Role { +pub trait RoleExt { + fn to_pg_options(&self) -> String; +} + +impl RoleExt for Role { /// Serialize a list of role parameters into a Postgres-acceptable /// string of arguments. - pub fn to_pg_options(&self) -> String { + fn to_pg_options(&self) -> String { // XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane. // For now, we do not use generic `options` for roles. Once used, add // `self.options.as_pg_options()` somewhere here. @@ -159,21 +137,17 @@ impl Role { } } -impl Database { - pub fn new(name: PgIdent, owner: PgIdent) -> Self { - Self { - name, - owner, - options: None, - } - } +pub trait DatabaseExt { + fn to_pg_options(&self) -> String; +} +impl DatabaseExt for Database { /// Serialize a list of database parameters into a Postgres-acceptable /// string of arguments. /// NB: `TEMPLATE` is actually also an identifier, but so far we only need /// to use `template0` and `template1`, so it is not a problem. Yet in the future /// it may require a proper quoting too. - pub fn to_pg_options(&self) -> String { + fn to_pg_options(&self) -> String { let mut params: String = self.options.as_pg_options(); write!(params, " OWNER {}", &self.owner.pg_quote()) .expect("String is documented to not to error during write operations"); @@ -182,10 +156,6 @@ impl Database { } } -/// String type alias representing Postgres identifier and -/// intended to be used for DB / role names. -pub type PgIdent = String; - /// Generic trait used to provide quoting / encoding for strings used in the /// Postgres SQL queries and DATABASE_URL. pub trait Escaping { @@ -226,7 +196,11 @@ pub fn get_existing_dbs(client: &mut Client) -> Result> { &[], )? .iter() - .map(|row| Database::new(row.get("datname"), row.get("owner"))) + .map(|row| Database { + name: row.get("datname"), + owner: row.get("owner"), + options: None, + }) .collect(); Ok(postgres_dbs) diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index b7f15a99d179..2350113c39b0 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,57 +1,16 @@ -use std::collections::HashMap; use std::path::Path; use std::str::FromStr; use anyhow::Result; use postgres::config::Config; use postgres::{Client, NoTls}; -use serde::Deserialize; use tracing::{info, info_span, instrument, span_enabled, warn, Level}; use crate::config; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; -/// Cluster spec or configuration represented as an optional number of -/// delta operations + final cluster state description. -#[derive(Clone, Deserialize, Debug, Default)] -pub struct ComputeSpec { - pub format_version: f32, - pub timestamp: String, - pub operation_uuid: Option, - /// Expected cluster state at the end of transition process. - pub cluster: Cluster, - pub delta_operations: Option>, - - pub storage_auth_token: Option, - - pub startup_tracing_context: Option>, -} - -/// Cluster state seen from the perspective of the external tools -/// like Rails web console. -#[derive(Clone, Deserialize, Debug, Default)] -pub struct Cluster { - pub cluster_id: String, - pub name: String, - pub state: Option, - pub roles: Vec, - pub databases: Vec, - pub settings: GenericOptions, -} - -/// Single cluster state changing operation that could not be represented as -/// a static `Cluster` structure. For example: -/// - DROP DATABASE -/// - DROP ROLE -/// - ALTER ROLE name RENAME TO new_name -/// - ALTER DATABASE name RENAME TO new_name -#[derive(Clone, Deserialize, Debug)] -pub struct DeltaOp { - pub action: String, - pub name: PgIdent, - pub new_name: Option, -} +use compute_api::spec::{ComputeSpec, Database, PgIdent, Role}; /// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT` /// env variable is set, it will be used for authorization. diff --git a/compute_tools/tests/pg_helpers_tests.rs b/compute_tools/tests/pg_helpers_tests.rs index f48211f7ed6a..a63ee038c7f8 100644 --- a/compute_tools/tests/pg_helpers_tests.rs +++ b/compute_tools/tests/pg_helpers_tests.rs @@ -1,14 +1,13 @@ #[cfg(test)] mod pg_helpers_tests { - use std::fs::File; + use compute_api::spec::{ComputeSpec, GenericOption, GenericOptions, PgIdent}; use compute_tools::pg_helpers::*; - use compute_tools::spec::ComputeSpec; #[test] fn params_serialize() { - let file = File::open("tests/cluster_spec.json").unwrap(); + let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap(); let spec: ComputeSpec = serde_json::from_reader(file).unwrap(); assert_eq!( @@ -23,7 +22,7 @@ mod pg_helpers_tests { #[test] fn settings_serialize() { - let file = File::open("tests/cluster_spec.json").unwrap(); + let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap(); let spec: ComputeSpec = serde_json::from_reader(file).unwrap(); assert_eq!( diff --git a/libs/compute_api/Cargo.toml b/libs/compute_api/Cargo.toml new file mode 100644 index 000000000000..533a0912073d --- /dev/null +++ b/libs/compute_api/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "compute_api" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +chrono.workspace = true +serde.workspace = true +serde_with.workspace = true +serde_json.workspace = true + +workspace_hack.workspace = true diff --git a/libs/compute_api/src/lib.rs b/libs/compute_api/src/lib.rs new file mode 100644 index 000000000000..b660799ec05b --- /dev/null +++ b/libs/compute_api/src/lib.rs @@ -0,0 +1,3 @@ +pub mod requests; +pub mod responses; +pub mod spec; diff --git a/compute_tools/src/http/requests.rs b/libs/compute_api/src/requests.rs similarity index 76% rename from compute_tools/src/http/requests.rs rename to libs/compute_api/src/requests.rs index 2e41c7aea4ef..5896c7dc6593 100644 --- a/compute_tools/src/http/requests.rs +++ b/libs/compute_api/src/requests.rs @@ -1,7 +1,10 @@ -use serde::Deserialize; +//! Structs representing the JSON formats used in the compute_ctl's HTTP API. use crate::spec::ComputeSpec; +use serde::Deserialize; +/// Request of the /configure API +/// /// We now pass only `spec` in the configuration request, but later we can /// extend it and something like `restart: bool` or something else. So put /// `spec` into a struct initially to be more flexible in the future. diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs new file mode 100644 index 000000000000..43289a5e3efd --- /dev/null +++ b/libs/compute_api/src/responses.rs @@ -0,0 +1,66 @@ +//! Structs representing the JSON formats used in the compute_ctl's HTTP API. + +use chrono::{DateTime, Utc}; +use serde::{Serialize, Serializer}; + +#[derive(Serialize, Debug)] +pub struct GenericAPIError { + pub error: String, +} + +/// Response of the /status API +#[derive(Serialize, Debug)] +#[serde(rename_all = "snake_case")] +pub struct ComputeStatusResponse { + pub tenant: String, + pub timeline: String, + pub status: ComputeStatus, + #[serde(serialize_with = "rfc3339_serialize")] + pub last_active: DateTime, + pub error: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "snake_case")] +pub struct ComputeState { + pub status: ComputeStatus, + /// Timestamp of the last Postgres activity + #[serde(serialize_with = "rfc3339_serialize")] + pub last_active: DateTime, + pub error: Option, +} + +#[derive(Serialize, Clone, Copy, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ComputeStatus { + // Spec wasn't provided at start, waiting for it to be + // provided by control-plane. + Empty, + // Compute configuration was requested. + ConfigurationPending, + // Compute node has spec and initial startup and + // configuration is in progress. + Init, + // Compute is configured and running. + Running, + // Either startup or configuration failed, + // compute will exit soon or is waiting for + // control-plane to terminate it. + Failed, +} + +fn rfc3339_serialize(x: &DateTime, s: S) -> Result +where + S: Serializer, +{ + x.to_rfc3339().serialize(s) +} + +/// Response of the /metrics.json API +#[derive(Clone, Debug, Default, Serialize)] +pub struct ComputeMetrics { + pub sync_safekeepers_ms: u64, + pub basebackup_ms: u64, + pub config_ms: u64, + pub total_startup_ms: u64, +} diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs new file mode 100644 index 000000000000..37fe133b68aa --- /dev/null +++ b/libs/compute_api/src/spec.rs @@ -0,0 +1,94 @@ +//! `ComputeSpec` represents the contents of the spec.json file. +//! +//! The spec.json file is used to pass information to 'compute_ctl'. It contains +//! all the information needed to start up the right version of PostgreSQL, +//! and connect it to the storage nodes. +use serde::Deserialize; +use std::collections::HashMap; + +/// String type alias representing Postgres identifier and +/// intended to be used for DB / role names. +pub type PgIdent = String; + +/// Cluster spec or configuration represented as an optional number of +/// delta operations + final cluster state description. +#[derive(Clone, Debug, Default, Deserialize)] +pub struct ComputeSpec { + pub format_version: f32, + pub timestamp: String, + pub operation_uuid: Option, + /// Expected cluster state at the end of transition process. + pub cluster: Cluster, + pub delta_operations: Option>, + + pub storage_auth_token: Option, + + pub startup_tracing_context: Option>, +} + +#[derive(Clone, Debug, Default, Deserialize)] +pub struct Cluster { + pub cluster_id: String, + pub name: String, + pub state: Option, + pub roles: Vec, + pub databases: Vec, + pub settings: GenericOptions, +} + +/// Single cluster state changing operation that could not be represented as +/// a static `Cluster` structure. For example: +/// - DROP DATABASE +/// - DROP ROLE +/// - ALTER ROLE name RENAME TO new_name +/// - ALTER DATABASE name RENAME TO new_name +#[derive(Clone, Debug, Deserialize)] +pub struct DeltaOp { + pub action: String, + pub name: PgIdent, + pub new_name: Option, +} + +/// Rust representation of Postgres role info with only those fields +/// that matter for us. +#[derive(Clone, Debug, Deserialize)] +pub struct Role { + pub name: PgIdent, + pub encrypted_password: Option, + pub options: GenericOptions, +} + +/// Rust representation of Postgres database info with only those fields +/// that matter for us. +#[derive(Clone, Debug, Deserialize)] +pub struct Database { + pub name: PgIdent, + pub owner: PgIdent, + pub options: GenericOptions, +} + +/// Common type representing both SQL statement params with or without value, +/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config +/// options like `wal_level = logical`. +#[derive(Clone, Debug, Deserialize)] +pub struct GenericOption { + pub name: String, + pub value: Option, + pub vartype: String, +} + +/// Optional collection of `GenericOption`'s. Type alias allows us to +/// declare a `trait` on it. +pub type GenericOptions = Option>; + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + + #[test] + fn parse_spec_file() { + let file = File::open("tests/cluster_spec.json").unwrap(); + let _spec: ComputeSpec = serde_json::from_reader(file).unwrap(); + } +} diff --git a/compute_tools/tests/cluster_spec.json b/libs/compute_api/tests/cluster_spec.json similarity index 100% rename from compute_tools/tests/cluster_spec.json rename to libs/compute_api/tests/cluster_spec.json