Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

207 add web app console advanced features and components #228

Merged
2 changes: 1 addition & 1 deletion core/data-layer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ uuid = { version = "1.3.0", features = [
"macro-diagnostics",
"serde",
] }
ts-rs = "6.2.1"
ts-rs = { version = "7.1.1", features = ["no-serde-warnings"] }
rand = { version = "0.8.5" }
ulid = { version = "1.0.0" }
handlebars = "4.3.7"
Expand Down
2 changes: 0 additions & 2 deletions core/data-layer/src/types/autoscaling_history_definition.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


use serde::{Deserialize, Serialize};
use serde_valid::Validate;
use ts_rs::TS;
Expand Down
2 changes: 0 additions & 2 deletions core/wave-autoscale/moon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ tasks:
- "run"
- "-p"
- "wave-autoscale"
# - "--quiet"
# - "--verbose"
run-help:
command:
- "cargo"
Expand Down
23 changes: 20 additions & 3 deletions core/wave-autoscale/src/metric_collector_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,10 @@ impl MetricsCollectorManager {

let mut collector_processes: Vec<process::AppInfo> = Vec::new();

//
// collector: vector
//

let mut vector_metric_definitions: Vec<&MetricDefinition> = Vec::new();
for metric_definition in metric_definitions {
if metric_definition.collector == VECTOR_COLLECTOR {
Expand Down Expand Up @@ -370,7 +373,10 @@ impl MetricsCollectorManager {
}
}

//
// collector: telegraf
//

let mut telegraf_metric_definitions: Vec<&MetricDefinition> = Vec::new();
for metric_definition in metric_definitions {
if metric_definition.collector == TELEGRAF_COLLECTOR {
Expand Down Expand Up @@ -400,7 +406,9 @@ impl MetricsCollectorManager {
}
}

// kill agent process
//
// kill agent process (vector, telegraf)
//
if let Some(running_apps) = &mut self.running_apps {
running_apps
.iter_mut()
Expand Down Expand Up @@ -428,7 +436,10 @@ impl MetricsCollectorManager {
self.running_apps = Some(running_apps);
}

//
// wa-generator
// Find the metric definitions that use WA Generator collector
//

// Stop the running tasks
if let Some(running_tasks) = &mut self.running_tasks {
Expand All @@ -446,12 +457,15 @@ impl MetricsCollectorManager {
}
if !wa_generator_metric_definitions.is_empty() {
// Run the collector binaries
let mut running_tasks: Vec<tokio::task::JoinHandle<()>> = Vec::new();
for metric_definition in wa_generator_metric_definitions {
let output_url = self.output_url.clone();
let handle = wa_generator::run(metric_definition.clone(), output_url);
if let Ok(handle) = handle {
running_tasks.push(handle);
if let Some(running_tasks) = &mut self.running_tasks {
running_tasks.push(handle);
} else {
self.running_tasks = Some(vec![handle]);
}
}
}
}
Expand Down Expand Up @@ -1429,7 +1443,10 @@ mod tests {
assert!(!validate_telegraf_definition(&metric_definition_fail_1));
}

// TODO: Run test

#[test]
#[traced_test]
fn retry_test() {
fn return_ok_of_num(ok_num: i32) -> Result<(), ()> {
if ok_num == 2 {
Expand Down
141 changes: 111 additions & 30 deletions core/wave-autoscale/src/scaling_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,23 @@ use tokio::{sync::RwLock, task::JoinHandle, time};
use tracing::{debug, error, info};
use ulid::Ulid;

// Constants
const PLAN_EXPRESSION_PERIOD_SEC: u64 = 5 * 60;

/**
* ExressionResult
*/

#[derive(Debug, Clone)]
struct ExressionResult {
result: bool,
error: bool,
message: Option<String>,
}

/**
* PlanExpressionStats
*/
#[derive(Debug, Clone)]
enum PlanExpressionStats {
Latest,
Expand Down Expand Up @@ -100,6 +115,47 @@ async fn apply_scaling_components(
scaling_results
}

/**
Create a AutoscalingHistoryDefinition
- plan_db_id
- plan_id
- plan_item_json
- metric_values_json
- metadata_values_json
*/
async fn create_autoscaling_history(
// plan_db_id, paln
data_layer: &Arc<DataLayer>,
plan_db_id: String,
plan_id: String,
plan_item: &PlanItemDefinition,
expression_value_map: Option<&Vec<HashMap<String, Option<f64>>>>,
scaling_components_metadata: Option<&Value>,
fail_message: Option<String>,
) {
let metric_values_json = if let Some(expression_value_map) = expression_value_map {
json!(expression_value_map).to_string()
} else {
"".to_string()
};
let metadata_values_json = if let Some(scaling_components_metadata) = scaling_components_metadata
{
json!(scaling_components_metadata).to_string()
} else {
"".to_string()
};
let autoscaling_history: AutoscalingHistoryDefinition = AutoscalingHistoryDefinition::new(
plan_db_id,
plan_id,
json!(plan_item).to_string(),
metric_values_json,
metadata_values_json,
fail_message,
);
debug!("[ScalingPlanner] autoscaling_history - {:?}", autoscaling_history);
let _ = data_layer.add_autoscaling_history(autoscaling_history).await;
}

pub struct ScalingPlanner {
definition: ScalingPlanDefinition,
metric_updater: SharedMetricUpdater,
Expand Down Expand Up @@ -149,11 +205,14 @@ impl<'a> ScalingPlanner {
let shared_scaling_component_manager = self.scaling_component_manager.clone();
let shared_last_run = self.last_plan_item_id.clone();
let shared_last_plan_timestamp = self.last_plan_timestamp.clone();
let scaling_plan_definition = self.definition.clone();
let data_layer: Arc<DataLayer> = self.data_layer.clone();

// PlanDefinition
let scaling_plan_definition = self.definition.clone();
let plan_id = scaling_plan_definition.id.clone();
let plan_db_id = scaling_plan_definition.db_id.clone();
// metadata
let plan_metadata = scaling_plan_definition.metadata;
let plan_metadata = scaling_plan_definition.metadata.clone();

// For plan_interval
let plan_interval: u16 = plan_metadata
Expand All @@ -178,7 +237,7 @@ impl<'a> ScalingPlanner {
plan_interval
};

let plans = self.sort_plan_by_priority();
let plan_items = self.sort_plan_by_priority();

let mut interval = time::interval(Duration::from_millis(plan_interval as u64));

Expand Down Expand Up @@ -223,18 +282,28 @@ impl<'a> ScalingPlanner {

let mut excuted = false;
// Find the plan that matches the expression
for plan in plans.iter() {
if plan.cron_expression.is_none() && plan.expression.is_none() {
for plan_item in plan_items.iter() {
if plan_item.cron_expression.is_none() && plan_item.expression.is_none() {
error!("[ScalingPlanner] Both cron_expression and expression are empty");
continue;
}
// 1. Cron Expression
if let Some(cron_expression) = plan.cron_expression.as_ref() {
if let Some(cron_expression) = plan_item.cron_expression.as_ref() {
debug!("[ScalingPlanner] cron_expression - {}", cron_expression);
if !cron_expression.is_empty() {
let schedule = cron::Schedule::from_str(cron_expression.as_str());
if schedule.is_err() {
error!("[ScalingPlanner] Error parsing cron expression: {}", cron_expression);
// If the expression is invalid, create a AutoscalingHistoryDefinition for the error
create_autoscaling_history(
&data_layer.clone(),
plan_db_id.clone(),
plan_id.clone(),
plan_item,
None,
None,
Some("Failed to parse cron expression".to_string())
).await;
continue;
}
let schedule = schedule.unwrap();
Expand All @@ -251,7 +320,7 @@ impl<'a> ScalingPlanner {
let duration = datetime - now;
let duration = duration.num_milliseconds();
if duration < 0 || duration > DEFAULT_PLAN_INTERVAL as i64 {
error!(
info!(
"[ScalingPlanner] The datetime is not yet reached for cron expression: {}",
cron_expression
);
Expand All @@ -264,35 +333,49 @@ impl<'a> ScalingPlanner {
// 2. JS Expression
let mut expression_value_map: Vec<HashMap<String, Option<f64>>> =
Vec::new();
if let Some(expression) = plan.expression.as_ref() {
if let Some(expression) = plan_item.expression.as_ref() {
if !expression.is_empty() {
debug!("[ScalingPlanner] expression\n{}", expression);
// Evaluate the expression
let result = async_with!(context => |ctx| {
// Evaluate the expression.
let expression_result = async_with!(context => |ctx| {
let result = ctx.eval::<bool, _>(expression.clone());
if result.is_err() {
let message = result.err().unwrap().to_string();
error!("[ScalingPlanner] Failed to evaluate expression\n{}\n\n{}", expression, message);
return false;
let message = format!("Failed to evaluate expression\n{}", expression);
return ExressionResult {
result: false,
error: true,
message: Some(message),
};
}
let result = result.unwrap();
ExressionResult {
result,
error: false,
message: None,
}
true
})
.await;

debug!("[ScalingPlanner] expression result - {:?}", result);
debug!("[ScalingPlanner] expression result - {:?}", expression_result);
// expression get value (for history)
let expression_map =
expression_get_value(expression.clone(), context.clone()).await;
expression_value_map.append(&mut expression_map.clone());

// If the expression is false, move to the next plan
if !result {
if !expression_result.result {
// If the expression is invalid, create a AutoscalingHistoryDefinition for the error
if expression_result.error {
create_autoscaling_history(&data_layer.clone(),plan_db_id.clone(),plan_id.clone(),plan_item,None,None,expression_result.message).await;
}
continue;
}
}
}

let results = run_plan_item(plan, &shared_scaling_component_manager).await;
let results = run_plan_item(plan_item, &shared_scaling_component_manager).await;

// update last plan timestamp
if !results.is_empty() {
Expand All @@ -304,7 +387,7 @@ impl<'a> ScalingPlanner {
// Update the last run
{
let mut shared_last_run = shared_last_run.write().await;
let scaling_plan_id = &plan.id;
let scaling_plan_id = &plan_item.id;
*shared_last_run = scaling_plan_id.clone();
info!("[ScalingPlanner] Applied scaling plan: {}", scaling_plan_id);
}
Expand All @@ -315,20 +398,18 @@ impl<'a> ScalingPlanner {
Ok(_) => None,
Err(error) => Some(error.to_string()),
};
let scaling_components_metadata = &plan.scaling_components;
let autoscaling_history: AutoscalingHistoryDefinition =
AutoscalingHistoryDefinition::new(
scaling_plan_definition.db_id.clone(),
scaling_plan_definition.id.clone(),
json!(plan).to_string(),
json!(expression_value_map.clone()).to_string(),
json!(scaling_components_metadata[index].clone()).to_string(),
fail_message,
);
debug!("[ScalingPlanner] autoscaling_history - {:?}", autoscaling_history);
let _ = data_layer
.add_autoscaling_history(autoscaling_history)
.await;
let scaling_components_metadata = &plan_item.scaling_components;

// Create a AutoscalingHistoryDefinition
create_autoscaling_history(
&data_layer,
scaling_plan_definition.db_id.clone(),
scaling_plan_definition.id.clone(),
plan_item,
Some(&expression_value_map),
Some(&scaling_components_metadata[index]),
fail_message,
).await;
}
// Stop the loop. We only want to execute one plan per interval.
excuted = true;
Expand Down
21 changes: 21 additions & 0 deletions core/web-app/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion core/web-app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"react-ace": "^10.1.0",
"react-dom": "^18.2.0",
"react-hook-form": "^7.43.9",
"react-toastify": "^10.0.4",
"reactflow": "^11.10.1",
"typescript": "5.0.4",
"ulid": "^2.3.0",
Expand All @@ -55,4 +56,4 @@
"prettier-plugin-tailwindcss": "^0.2.7",
"tailwindcss": "^3.3.1"
}
}
}
Loading
Loading