Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move compute_ctl structs used in HTTP API and spec file to separate crate. #3937

Merged
merged 1 commit into from
Apr 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df6
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending

## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
Expand Down
1 change: 1 addition & 0 deletions compute_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
url.workspace = true

compute_api.workspace = true
workspace_hack.workspace = true
5 changes: 3 additions & 2 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ use clap::Arg;
use tracing::{error, info};
use url::Url;

use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
use compute_api::responses::ComputeStatus;

use compute_tools::compute::{ComputeNode, ComputeState};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
Expand Down Expand Up @@ -116,7 +118,6 @@ fn main() -> Result<()> {
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
live_config_allowed,
metrics: ComputeMetrics::default(),
state: Mutex::new(new_state),
state_changed: Condvar::new(),
};
Expand Down
81 changes: 23 additions & 58 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use serde::Serialize;
use tokio_postgres;
use tracing::{info, instrument, warn};

use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;

use crate::checker::create_writability_check_data;
use crate::config;
use crate::pg_helpers::*;
Expand All @@ -41,7 +42,6 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub metrics: ComputeMetrics,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
Expand Down Expand Up @@ -74,6 +74,8 @@ pub struct ComputeState {
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,

pub metrics: ComputeMetrics,
}

impl ComputeState {
Expand All @@ -87,6 +89,7 @@ impl ComputeState {
timeline: String::new(),
pageserver_connstr: String::new(),
storage_auth_token: None,
metrics: ComputeMetrics::default(),
}
}
}
Expand All @@ -97,33 +100,6 @@ impl Default for ComputeState {
}
}

#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
// Spec wasn't provided at start, waiting for it to be
// provided by control-plane.
Empty,
// Compute configuration was requested.
ConfigurationPending,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute is configured and running.
Running,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
}

#[derive(Default, Serialize)]
pub struct ComputeMetrics {
pub sync_safekeepers_ms: AtomicU64,
pub basebackup_ms: AtomicU64,
pub config_ms: AtomicU64,
pub total_startup_ms: AtomicU64,
}

impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
let mut state = self.state.lock().unwrap();
Expand Down Expand Up @@ -185,15 +161,11 @@ impl ComputeNode {
ar.set_ignore_zeros(true);
ar.unpack(&self.pgdata)?;

self.metrics.basebackup_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);

self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
Ok(())
}

Expand Down Expand Up @@ -231,14 +203,11 @@ impl ComputeNode {
);
}

self.metrics.sync_safekeepers_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;

let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim());

Expand Down Expand Up @@ -375,23 +344,19 @@ impl ComputeNode {
self.apply_config(&compute_state)?;

let startup_end_time = Utc::now();
self.metrics.config_ms.store(
startup_end_time
{
let mut state = self.state.lock().unwrap();
state.metrics.config_ms = startup_end_time
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.metrics.total_startup_ms.store(
startup_end_time
.as_millis() as u64;
state.metrics.total_startup_ms = startup_end_time
.signed_duration_since(self.start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);

.as_millis() as u64;
}
self.set_status(ComputeStatus::Running);

Ok(pg)
Expand Down
2 changes: 1 addition & 1 deletion compute_tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::path::Path;
use anyhow::Result;

use crate::pg_helpers::PgOptionsSerialize;
use crate::spec::ComputeSpec;
use compute_api::spec::ComputeSpec;

/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
Expand Down
26 changes: 18 additions & 8 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;

use crate::compute::{ComputeNode, ComputeStatus};
use crate::http::requests::ConfigurationRequest;
use crate::http::responses::{ComputeStatusResponse, GenericAPIError};
use crate::compute::{ComputeNode, ComputeState};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};

use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
Expand All @@ -16,6 +16,16 @@ use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;

fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
tenant: state.tenant.clone(),
timeline: state.timeline.clone(),
status: state.status,
last_active: state.last_active,
error: state.error.clone(),
}
}

// Service function to handle all available routes.
async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
//
Expand All @@ -28,16 +38,16 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
(&Method::GET, "/status") => {
info!("serving /status GET request");
let state = compute.state.lock().unwrap();
let status_response = ComputeStatusResponse::from(state.clone());

let status_response = status_response_from_state(&state);
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
}

// Startup metrics in JSON format. Keep /metrics reserved for a possible
// future use for Prometheus metrics format.
(&Method::GET, "/metrics.json") => {
info!("serving /metrics.json GET request");
Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap()))
let metrics = compute.state.lock().unwrap().metrics.clone();
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
}

// Collect Postgres current usage insights
Expand Down Expand Up @@ -162,7 +172,7 @@ async fn handle_configure_request(
);

if state.status == ComputeStatus::Failed {
let err = state.error.clone().unwrap_or("unknown error".to_string());
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
Expand All @@ -175,7 +185,7 @@ async fn handle_configure_request(

// Return current compute state if everything went well.
let state = compute.state.lock().unwrap().clone();
let status_response = ComputeStatusResponse::from(state);
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
} else {
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
Expand Down
2 changes: 0 additions & 2 deletions compute_tools/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
pub mod api;
pub mod requests;
pub mod responses;
40 changes: 0 additions & 40 deletions compute_tools/src/http/responses.rs

This file was deleted.

Loading