Skip to content

Commit

Permalink
feat: add ability to pass topology as structure
Browse files Browse the repository at this point in the history
  • Loading branch information
dodokek authored and lowitea committed Feb 27, 2025
1 parent a809b9e commit f9478c3
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 35 deletions.
106 changes: 76 additions & 30 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,51 @@ use std::{fs, path::PathBuf};

use crate::commands::lib;

#[derive(Debug, Deserialize)]
struct Tier {
#[derive(Debug, Deserialize, Clone)]
pub struct Tier {
replicasets: u8,
replication_factor: u8,
}

#[derive(Debug, Deserialize)]
struct MigrationContextVar {
impl Default for Tier {
fn default() -> Self {
Self {
replicasets: 2,
replication_factor: 2,
}
}
}

#[derive(Debug, Deserialize, Clone)]
pub struct MigrationContextVar {
name: String,
value: String,
}

#[derive(Debug, Deserialize)]
struct Service {
impl Default for MigrationContextVar {
fn default() -> Self {
Self {
name: "example_name".to_string(),
value: "example_value".to_string(),
}
}
}

#[derive(Debug, Deserialize, Clone)]
pub struct Service {
tiers: Vec<String>,
}

#[derive(Debug, Deserialize)]
struct Plugin {
impl Default for Service {
fn default() -> Self {
Self {
tiers: vec!["default".to_string()],
}
}
}

#[derive(Debug, Deserialize, Clone)]
pub struct Plugin {
#[serde(default)]
migration_context: Vec<MigrationContextVar>,
#[serde(default)]
Expand All @@ -47,15 +73,44 @@ struct Plugin {
version: Option<String>,
}

#[derive(Debug, Deserialize)]
struct Topology {
impl Default for Plugin {
fn default() -> Self {
let mut services = BTreeMap::new();
services.insert("main".to_string(), Service::default());

Self {
migration_context: vec![MigrationContextVar::default()],
services,
version: None,
}
}
}

#[derive(Debug, Deserialize, Clone)]
pub struct Topology {
#[serde(rename = "tier")]
tiers: BTreeMap<String, Tier>,
pub tiers: BTreeMap<String, Tier>,
#[serde(default)]
#[serde(rename = "plugin")]
plugins: BTreeMap<String, Plugin>,
pub plugins: BTreeMap<String, Plugin>,
#[serde(default)]
enviroment: BTreeMap<String, String>,
pub enviroment: BTreeMap<String, String>,
}

impl Default for Topology {
fn default() -> Self {
let mut tiers = BTreeMap::new();
tiers.insert("default".to_string(), Tier::default());

let mut plugins = BTreeMap::new();
plugins.insert("default_plugin_name".to_string(), Plugin::default());

Self {
tiers,
plugins,
enviroment: BTreeMap::new(),
}
}
}

impl Topology {
Expand Down Expand Up @@ -364,8 +419,8 @@ impl Drop for PicodataInstance {
#[allow(clippy::struct_excessive_bools)]
#[derive(Debug, Builder)]
pub struct Params {
#[builder(default = "PathBuf::from(\"topology.toml\")")]
topology_path: PathBuf,
#[builder]
topology: Topology,
#[builder(default = "PathBuf::from(\"./tmp\")")]
data_dir: PathBuf,
#[builder(default = "false")]
Expand All @@ -386,16 +441,7 @@ pub struct Params {
disable_colors: bool,
}

pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
let mut topology: Topology = toml::from_str(
&fs::read_to_string(&params.topology_path)
.context(format!("failed to read {}", params.topology_path.display()))?,
)
.context(format!(
"failed to parse .toml file of {}",
params.topology_path.display()
))?;

pub fn cluster(params: &mut Params) -> Result<Vec<PicodataInstance>> {
let plugins_dir = if params.use_release {
cargo_build(lib::BuildType::Release, &params.target_dir)?;
params.target_dir.join("release")
Expand All @@ -404,7 +450,7 @@ pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
params.target_dir.join("debug")
};

topology.find_plugin_versions(&plugins_dir)?;
params.topology.find_plugin_versions(&plugins_dir)?;

info!("Running the cluster...");

Expand All @@ -414,7 +460,7 @@ pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {

let first_instance_bin_port = 3001;
let mut instance_id = 0;
for (tier_name, tier) in &topology.tiers {
for (tier_name, tier) in &params.topology.tiers {
for _ in 0..(tier.replicasets * tier.replication_factor) {
instance_id += 1;
let pico_instance = PicodataInstance::new(
Expand All @@ -427,7 +473,7 @@ pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
tier.replication_factor,
tier_name,
params,
&topology.enviroment,
&params.topology.enviroment,
)?;

picodata_processes.push(pico_instance);
Expand All @@ -449,7 +495,7 @@ pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
thread::sleep(Duration::from_secs(5));
}

let result = enable_plugins(&topology, &params.data_dir, &params.picodata_path);
let result = enable_plugins(&params.topology, &params.data_dir, &params.picodata_path);
if let Err(e) = result {
for process in &mut picodata_processes {
process.kill().unwrap_or_else(|e| {
Expand All @@ -468,7 +514,7 @@ pub fn cluster(params: &Params) -> Result<Vec<PicodataInstance>> {
#[allow(clippy::too_many_arguments)]
#[allow(clippy::fn_params_excessive_bools)]
#[allow(clippy::cast_possible_wrap)]
pub fn cmd(params: &Params) -> Result<()> {
pub fn cmd(params: &mut Params) -> Result<()> {
let mut pico_instances = cluster(params)?;

if params.daemon {
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ pub mod cluster {
pub use crate::commands::run::cluster as run;
pub use crate::commands::run::ParamsBuilder as RunParamsBuilder;

pub use crate::commands::run::Plugin;
pub use crate::commands::run::Service;
pub use crate::commands::run::Tier;
pub use crate::commands::run::Topology;

pub use crate::commands::stop::cmd as stop;
pub use crate::commands::stop::ParamsBuilder as StopParamsBuilder;
}
Expand Down
17 changes: 13 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use nix::unistd::{fork, ForkResult};
use std::{env, path::PathBuf, process, thread, time::Duration};
use std::{env, fs, path::PathBuf, process, thread, time::Duration};

mod commands;

Expand Down Expand Up @@ -187,8 +187,17 @@ fn main() -> Result<()> {
if !daemon {
run_child_killer();
}
let params = commands::run::ParamsBuilder::default()
.topology_path(topology)
let topology: commands::run::Topology = toml::from_str(
&fs::read_to_string(&topology)
.context(format!("failed to read {}", &topology.display()))?,
)
.context(format!(
"failed to parse .toml file of {}",
topology.display()
))?;

let mut params = commands::run::ParamsBuilder::default()
.topology(topology)
.data_dir(data_dir)
.disable_plugin_install(disable_plugin_install)
.base_http_port(base_http_port)
Expand All @@ -200,7 +209,7 @@ fn main() -> Result<()> {
.disable_colors(disable_colors)
.build()
.unwrap();
commands::run::cmd(&params).context("failed to execute Run command")?;
commands::run::cmd(&mut params).context("failed to execute Run command")?;
}
Command::Stop { data_dir } => {
run_child_killer();
Expand Down
46 changes: 45 additions & 1 deletion tests/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
mod helpers;

use helpers::{run_cluster, CmdArguments, PLUGIN_DIR};
use helpers::{exec_pike, run_cluster, CmdArguments, PLUGIN_DIR, TESTS_DIR};
use pike::cluster::run;
use pike::cluster::Plugin;
use pike::cluster::RunParamsBuilder;
use pike::cluster::Topology;
use std::collections::BTreeMap;
use std::{
fs::{self},
path::Path,
Expand Down Expand Up @@ -79,3 +84,42 @@ fn test_cluster_daemon_and_arguments() {
}
}
}

#[test]
fn test_topology_struct_run() {
// Cleaning up metadata from past run
if Path::new(PLUGIN_DIR).exists() {
fs::remove_dir_all(PLUGIN_DIR).unwrap();
}

assert!(
exec_pike(vec!["plugin", "new", "test-plugin"], TESTS_DIR, &vec![])
.unwrap()
.success()
);

let mut plugins = BTreeMap::new();
plugins.insert("test-plugin".to_string(), Plugin::default());
let topology = Topology {
plugins: plugins,
..Default::default()
};

dbg!(&topology);

let mut params = RunParamsBuilder::default()
.topology(topology)
.data_dir(Path::new("./tmp").to_path_buf())
.disable_plugin_install(false)
.base_http_port(8000)
.picodata_path(Path::new("picodata").to_path_buf())
.base_pg_port(5432)
.use_release(false)
.target_dir(Path::new("./target").to_path_buf())
.daemon(false)
.disable_colors(false)
.build()
.unwrap();

run(&mut params).unwrap();
}

0 comments on commit f9478c3

Please sign in to comment.