Skip to content

Commit

Permalink
Polish API handler and refresh OpenAPI spec
Browse files Browse the repository at this point in the history
  • Loading branch information
ololobus authored and hlinnaka committed Apr 5, 2023
1 parent 7cdf703 commit 89cc2c5
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 41 deletions.
3 changes: 1 addition & 2 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ fn main() -> Result<()> {
live_config_allowed,
inner: Mutex::new(ComputeNodeInner {
state: ComputeState {
// FIXME: should this be directly Init, if spec is set?
status: ComputeStatus::WaitingSpec,
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
},
Expand Down
11 changes: 5 additions & 6 deletions compute_tools/src/configurator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
let mut inner = compute.state_changed.wait(inner).unwrap();

if inner.state.status == ComputeStatus::ConfigurationPending {
info!("got reconfiguration request");
inner.state.status = ComputeStatus::Reconfiguration;
info!("got configuration request");
inner.state.status = ComputeStatus::Configuration;
compute.state_changed.notify_all();
drop(inner);

let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.reconfigure() {
error!("could not reconfigure compute node: {}", e);
error!("could not configure compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("compute node reconfigured");
info!("compute node configured");
}

// XXX: used to test that API is blocking
// TODO: remove before merge
std::thread::sleep(std::time::Duration::from_millis(2000));
// std::thread::sleep(std::time::Duration::from_millis(2000));

compute.set_status(new_status);
} else if inner.state.status == ComputeStatus::Failed {
Expand Down
41 changes: 25 additions & 16 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::sync::Arc;
use std::thread;

use crate::compute::{ComputeNode, ParsedSpec};
use crate::http::models::{ConfigurationRequest, GenericAPIError};
use compute_api::models::ComputeStatus;
use compute_api::spec::ComputeSpec;

use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
Expand All @@ -15,26 +15,27 @@ use serde_json;
use tracing::{error, info};
use tracing_utils::http::OtelName;

async fn handle_spec_request(req: Request<Body>, compute: &Arc<ComputeNode>) -> Result<(), String> {
async fn handle_spec_request(req: Request<Body>, compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
if !compute.live_config_allowed {
return Err("live reconfiguration is not allowed for this compute node".to_string());
return Err(("live reconfiguration is not allowed for this compute node".to_string(), StatusCode::PRECONDITION_FAILED));
}

let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();

let spec = serde_json::from_str::<ComputeSpec>(&spec_raw)
.map_err(|err| format!("could not parse spec json: {err}"))?;
let spec = ParsedSpec::try_from(spec).map_err(|err| format!("could not parse spec: {err}"))?;
let request = serde_json::from_str::<ConfigurationRequest>(&spec_raw)
.map_err(|err| (format!("could not parse request json: {err}"), StatusCode::BAD_REQUEST))?;
let spec = ParsedSpec::try_from(request.spec)
.map_err(|err| (format!("could not parse spec: {err}"), StatusCode::BAD_REQUEST))?;

let mut inner = compute.inner.lock().unwrap();
if !(inner.state.status == ComputeStatus::WaitingSpec
if !(inner.state.status == ComputeStatus::Empty
|| inner.state.status == ComputeStatus::Running)
{
return Err(format!(
return Err((format!(
"invalid compute status for reconfiguration request: {}",
serde_json::to_string(&inner.state).unwrap()
));
), StatusCode::PRECONDITION_FAILED));
}
inner.spec = Some(spec);
inner.state.status = ComputeStatus::ConfigurationPending;
Expand Down Expand Up @@ -119,22 +120,20 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
))
}

// Accept spec in JSON format and request compute reconfiguration from
// Accept spec in JSON format and request compute configuration from
// the configurator thread. If anything goes wrong after we set the
// compute state to `ConfigurationPending` and / or sent spec to the
// configurator thread, we basically leave compute in the potentially
// wrong state. That said, it's control-plane's responsibility to
// watch compute state after reconfiguration request and to clean
// restart in case of errors.
//
// TODO: Errors should be in JSON format with proper status codes.
(&Method::POST, "/spec") => {
info!("serving /spec POST request");
(&Method::POST, "/configure") => {
info!("serving /configure POST request");
match handle_spec_request(req, compute).await {
Ok(()) => Response::new(Body::from("ok")),
Err(msg) => {
Err((msg, code) ) => {
error!("error handling /spec request: {msg}");
Response::new(Body::from(msg))
render_json_error(&msg, code)
}
}
}
Expand All @@ -148,6 +147,16 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}

fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
let error = GenericAPIError {
error: e.to_string(),
};
Response::builder()
.status(status)
.body(Body::from(serde_json::to_string(&error).unwrap()))
.unwrap()
}

// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(state: Arc<ComputeNode>) {
Expand Down
1 change: 1 addition & 0 deletions compute_tools/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod api;
pub mod models;
16 changes: 16 additions & 0 deletions compute_tools/src/http/models.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};

use compute_api::spec::ComputeSpec;

/// We now pass only `spec` in the configuration request, but later we can
/// extend it and something like `restart: bool` or something else. So put
/// `spec` into a struct initially to be more flexible in the future.
#[derive(Deserialize, Debug)]
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}

#[derive(Serialize, Debug)]
pub struct GenericAPIError {
pub error: String,
}
98 changes: 84 additions & 14 deletions compute_tools/src/http/openapi_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ paths:
get:
tags:
- Info
summary: Get compute node internal status
summary: Get compute node internal status.
description: ""
operationId: getComputeStatus
responses:
Expand All @@ -26,7 +26,7 @@ paths:
get:
tags:
- Info
summary: Get compute node startup metrics in JSON format
summary: Get compute node startup metrics in JSON format.
description: ""
operationId: getComputeMetricsJSON
responses:
Expand All @@ -41,9 +41,9 @@ paths:
get:
tags:
- Info
summary: Get current compute insights in JSON format
summary: Get current compute insights in JSON format.
description: |
Note, that this doesn't include any historical data
Note, that this doesn't include any historical data.
operationId: getComputeInsights
responses:
200:
Expand All @@ -56,12 +56,12 @@ paths:
/info:
get:
tags:
- "info"
summary: Get info about the compute Pod/VM
- Info
summary: Get info about the compute pod / VM.
description: ""
operationId: getInfo
responses:
"200":
200:
description: Info
content:
application/json:
Expand All @@ -72,7 +72,7 @@ paths:
post:
tags:
- Check
summary: Check that we can write new data on this compute
summary: Check that we can write new data on this compute.
description: ""
operationId: checkComputeWritability
responses:
Expand All @@ -82,9 +82,57 @@ paths:
text/plain:
schema:
type: string
description: Error text or 'true' if check passed
description: Error text or 'true' if check passed.
example: "true"

/configure:
post:
tags:
- Configure
summary: Request compute node configuration.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later. Currently,
it's also assumed that reconfiguration doesn't require restart.
operationId: configureCompute
requestBody:
description: Configuration request.
required: true
content:
application/json:
schema:
type: object
required:
- spec
properties:
spec:
# XXX: I don't want to explain current spec in the OpenAPI format,
# as it could be changed really soon. Consider doing it later.
type: object
responses:
200:
description: Compute configuration finished.
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeState"
400:
description: Provided spec is invalid.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
412:
description: |
It's not possible to do live-configuration of the compute.
It's either in the wrong state, or compute doesn't use pull
mode of configuration.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"

components:
securitySchemes:
JWT:
Expand All @@ -95,7 +143,7 @@ components:
schemas:
ComputeMetrics:
type: object
description: Compute startup metrics
description: Compute startup metrics.
required:
- sync_safekeepers_ms
- basebackup_ms
Expand All @@ -113,7 +161,7 @@ components:

Info:
type: object
description: Information about VM/Pod
description: Information about VM/Pod.
required:
- num_cpus
properties:
Expand All @@ -130,17 +178,26 @@ components:
$ref: '#/components/schemas/ComputeStatus'
last_active:
type: string
description: The last detected compute activity timestamp in UTC and RFC3339 format
description: The last detected compute activity timestamp in UTC and RFC3339 format.
example: "2022-10-12T07:20:50.52Z"
error:
type: string
description: Text of the error during compute startup, if any
description: Text of the error during compute startup, if any.
example: ""
tenant:
type: string
description: Identifier of the current tenant served by compute node, if any.
example: c9269c359e9a199fad1ea0981246a78f
timeline:
type: string
description: Identifier of the current timeline served by compute node, if any.
example: ece7de74d4b8cbe5433a68ce4d1b97b4

ComputeInsights:
type: object
properties:
pg_stat_statements:
description: Contains raw output from pg_stat_statements in JSON format
description: Contains raw output from pg_stat_statements in JSON format.
type: array
items:
type: object
Expand All @@ -151,6 +208,19 @@ components:
- init
- failed
- running
example: running

#
# Errors
#

GenericError:
type: object
required:
- error
properties:
error:
type: string

security:
- JWT: []
7 changes: 4 additions & 3 deletions libs/compute_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ pub struct ComputeState {
pub enum ComputeStatus {
// Spec wasn't provided as start, waiting for it to be
// provided by control-plane.
WaitingSpec,
// Compute node has initial spec and is starting up.
Empty,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute is configured and running.
Running,
Expand All @@ -31,7 +32,7 @@ pub enum ComputeStatus {
// Control-plane requested reconfiguration.
ConfigurationPending,
// New spec is being applied.
Reconfiguration,
Configuration,
}

fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
Expand Down

0 comments on commit 89cc2c5

Please sign in to comment.