Skip to content

Commit

Permalink
Move compute_ctl structs used in HTTP API and spec file to separate c…
Browse files Browse the repository at this point in the history
…rate.

This is in preparation of using compute_ctl to launch postgres nodes
in the neon_local control plane. And seems like a good idea to
separate the public interfaces anyway.

One non-mechanical change here is that the 'metrics' field is moved
under the Mutex, instead of using atomics. We were not using atomics
for performance but for convenience here, and it seems more clear to
not use atomics in the model for the HTTP response type.
  • Loading branch information
hlinnaka committed Apr 9, 2023
1 parent 818e341 commit f0b2e07
Show file tree
Hide file tree
Showing 18 changed files with 271 additions and 210 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df6
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending

## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
Expand Down
1 change: 1 addition & 0 deletions compute_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
url.workspace = true

compute_api.workspace = true
workspace_hack.workspace = true
5 changes: 3 additions & 2 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ use clap::Arg;
use tracing::{error, info};
use url::Url;

use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
use compute_api::responses::ComputeStatus;

use compute_tools::compute::{ComputeNode, ComputeState};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
Expand Down Expand Up @@ -116,7 +118,6 @@ fn main() -> Result<()> {
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
live_config_allowed,
metrics: ComputeMetrics::default(),
state: Mutex::new(new_state),
state_changed: Condvar::new(),
};
Expand Down
81 changes: 23 additions & 58 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use serde::Serialize;
use tokio_postgres;
use tracing::{info, instrument, warn};

use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;

use crate::checker::create_writability_check_data;
use crate::config;
use crate::pg_helpers::*;
Expand All @@ -41,7 +42,6 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub metrics: ComputeMetrics,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
Expand Down Expand Up @@ -74,6 +74,8 @@ pub struct ComputeState {
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,

pub metrics: ComputeMetrics,
}

impl ComputeState {
Expand All @@ -87,6 +89,7 @@ impl ComputeState {
timeline: String::new(),
pageserver_connstr: String::new(),
storage_auth_token: None,
metrics: ComputeMetrics::default(),
}
}
}
Expand All @@ -97,33 +100,6 @@ impl Default for ComputeState {
}
}

#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
// Spec wasn't provided at start, waiting for it to be
// provided by control-plane.
Empty,
// Compute configuration was requested.
ConfigurationPending,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute is configured and running.
Running,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
}

#[derive(Default, Serialize)]
pub struct ComputeMetrics {
pub sync_safekeepers_ms: AtomicU64,
pub basebackup_ms: AtomicU64,
pub config_ms: AtomicU64,
pub total_startup_ms: AtomicU64,
}

impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
let mut state = self.state.lock().unwrap();
Expand Down Expand Up @@ -185,15 +161,11 @@ impl ComputeNode {
ar.set_ignore_zeros(true);
ar.unpack(&self.pgdata)?;

self.metrics.basebackup_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);

self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
Ok(())
}

Expand Down Expand Up @@ -231,14 +203,11 @@ impl ComputeNode {
);
}

self.metrics.sync_safekeepers_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;

let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim());

Expand Down Expand Up @@ -375,23 +344,19 @@ impl ComputeNode {
self.apply_config(&compute_state)?;

let startup_end_time = Utc::now();
self.metrics.config_ms.store(
startup_end_time
{
let mut state = self.state.lock().unwrap();
state.metrics.config_ms = startup_end_time
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.metrics.total_startup_ms.store(
startup_end_time
.as_millis() as u64;
state.metrics.total_startup_ms = startup_end_time
.signed_duration_since(self.start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);

.as_millis() as u64;
}
self.set_status(ComputeStatus::Running);

Ok(pg)
Expand Down
2 changes: 1 addition & 1 deletion compute_tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::path::Path;
use anyhow::Result;

use crate::pg_helpers::PgOptionsSerialize;
use crate::spec::ComputeSpec;
use compute_api::spec::ComputeSpec;

/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
Expand Down
26 changes: 18 additions & 8 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;

use crate::compute::{ComputeNode, ComputeStatus};
use crate::http::requests::ConfigurationRequest;
use crate::http::responses::{ComputeStatusResponse, GenericAPIError};
use crate::compute::{ComputeNode, ComputeState};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};

use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
Expand All @@ -16,6 +16,16 @@ use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;

fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
tenant: state.tenant.clone(),
timeline: state.timeline.clone(),
status: state.status,
last_active: state.last_active,
error: state.error.clone(),
}
}

// Service function to handle all available routes.
async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
//
Expand All @@ -28,16 +38,16 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
(&Method::GET, "/status") => {
info!("serving /status GET request");
let state = compute.state.lock().unwrap();
let status_response = ComputeStatusResponse::from(state.clone());

let status_response = status_response_from_state(&state);
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
}

// Startup metrics in JSON format. Keep /metrics reserved for a possible
// future use for Prometheus metrics format.
(&Method::GET, "/metrics.json") => {
info!("serving /metrics.json GET request");
Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap()))
let metrics = compute.state.lock().unwrap().metrics.clone();
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
}

// Collect Postgres current usage insights
Expand Down Expand Up @@ -162,7 +172,7 @@ async fn handle_configure_request(
);

if state.status == ComputeStatus::Failed {
let err = state.error.clone().unwrap_or("unknown error".to_string());
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
Expand All @@ -175,7 +185,7 @@ async fn handle_configure_request(

// Return current compute state if everything went well.
let state = compute.state.lock().unwrap().clone();
let status_response = ComputeStatusResponse::from(state);
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
} else {
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
Expand Down
2 changes: 0 additions & 2 deletions compute_tools/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
pub mod api;
pub mod requests;
pub mod responses;
40 changes: 0 additions & 40 deletions compute_tools/src/http/responses.rs

This file was deleted.

Loading

0 comments on commit f0b2e07

Please sign in to comment.