Skip to content

Commit

Permalink
Add persistence to MSD definitions (#169)
Browse files Browse the repository at this point in the history
* Add networks-state-file arg to persist definitions

* repin

* repin
  • Loading branch information
pietrodimarco-dfinity authored Feb 7, 2024
1 parent 114aaed commit 73ac4cf
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 4 deletions.
18 changes: 15 additions & 3 deletions Cargo.Bazel.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "86820685790dc151c444b969cb6cb2ce2835933116a79bd69d9979b0df78d0db",
"checksum": "2bda21c3422f1e2a634bfe6e3d48e0041bd5dbb73f9d63b191e5393c2e338c19",
"crates": {
"actix-codec 0.5.2": {
"name": "actix-codec",
Expand Down Expand Up @@ -28157,23 +28157,29 @@
],
"crate_features": {
"common": [
"elf",
"errno",
"general",
"ioctl",
"no_std"
],
"selects": {
"aarch64-unknown-linux-gnu": [
"elf",
"errno",
"std"
],
"arm-unknown-linux-gnueabi": [
"elf",
"errno",
"std"
],
"armv7-unknown-linux-gnueabi": [
"elf",
"errno",
"std"
],
"i686-unknown-linux-gnu": [
"elf",
"errno",
"std"
],
"powerpc-unknown-linux-gnu": [
Expand All @@ -28183,6 +28189,8 @@
"std"
],
"x86_64-unknown-linux-gnu": [
"elf",
"errno",
"std"
]
}
Expand Down Expand Up @@ -29397,6 +29405,10 @@
"id": "regex 1.10.3",
"target": "regex"
},
{
"id": "retry 2.0.0",
"target": "retry"
},
{
"id": "serde 1.0.196",
"target": "serde"
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/ic-observability/multiservice-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tokio = { workspace = true }
url = { workspace = true }
futures.workspace = true
axum = "0.7.4"
retry = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
83 changes: 82 additions & 1 deletion rs/ic-observability/multiservice-discovery/src/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crossbeam_channel::Sender;
use futures_util::future::join_all;
use ic_management_types::Network;
use ic_registry_client::client::ThresholdSigPublicKey;
use serde::Deserialize;
use serde::Serialize;
use service_discovery::job_types::JobType;
use service_discovery::registry_sync::Interrupted;
use service_discovery::IcServiceDiscovery;
Expand All @@ -16,6 +18,8 @@ use std::collections::HashSet;
use std::error::Error;
use std::fmt::Debug;
use std::fmt::{Display, Error as FmtError, Formatter};
use std::fs;
use std::io::Write;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{
Expand All @@ -25,6 +29,33 @@ use std::{
use tokio::sync::Mutex;
use url::Url;

use crate::make_logger;

#[derive(Clone, Serialize, Deserialize)]
pub struct FSDefinition {
pub nns_urls: Vec<Url>,
pub registry_path: PathBuf,
pub name: String,
pub public_key: Option<ThresholdSigPublicKey>,
pub poll_interval: Duration,
pub registry_query_timeout: Duration,
pub boundary_nodes: Vec<BoundaryNode>,
}

impl From<Definition> for FSDefinition {
fn from(definition: Definition) -> Self {
Self {
nns_urls: definition.nns_urls,
registry_path: definition.registry_path,
name: definition.name,
public_key: definition.public_key,
poll_interval: definition.poll_interval,
registry_query_timeout: definition.registry_query_timeout,
boundary_nodes: definition.boundary_nodes
}
}
}

#[derive(Clone)]
pub struct Definition {
pub nns_urls: Vec<Url>,
Expand All @@ -38,6 +69,20 @@ pub struct Definition {
pub boundary_nodes: Vec<BoundaryNode>,
}

impl From<FSDefinition> for Definition {
fn from(fs_definition: FSDefinition) -> Self {
Definition::new(
fs_definition.nns_urls,
fs_definition.registry_path,
fs_definition.name,
make_logger(),
fs_definition.public_key,
fs_definition.poll_interval,
fs_definition.registry_query_timeout
)
}
}

impl Debug for Definition {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
write!(
Expand Down Expand Up @@ -300,7 +345,7 @@ impl RunningDefinition {
}
}

#[derive(Clone)]
#[derive(Clone, Serialize, Deserialize)]
pub struct BoundaryNode {
pub name: String,
pub targets: BTreeSet<SocketAddr>,
Expand Down Expand Up @@ -394,6 +439,42 @@ impl DefinitionsSupervisor {
}
}

pub(crate) async fn load_or_create_defs(&self, networks_state_file: PathBuf) -> Result<(), Box<dyn Error>> {
if networks_state_file.exists() {
let file_content = fs::read_to_string(networks_state_file)?;
let initial_definitions: Vec<FSDefinition> = serde_json::from_str(&file_content)?;
self.start(
initial_definitions.into_iter().map(|def| def.into()).collect(),
StartMode::AddToDefinitions,
)
.await?;
}
Ok(())
}

pub(crate) async fn persist_defs(&self, networks_state_file: PathBuf) -> Result<(), Box<dyn Error>> {
let existing = self.definitions.lock().await;
retry::retry(retry::delay::Exponential::from_millis(10).take(5), || {
std::fs::OpenOptions::new()
.create(true)
.write(true)
.open(&networks_state_file.as_path())
.and_then(|mut file| {
let fs_def: Vec<FSDefinition> = existing
.values()
.cloned()
.into_iter()
.map(|running_def| running_def.definition.into())
.collect::<Vec<_>>();

file.write_all(serde_json::to_string(&fs_def)?.as_bytes())
.map(|_| file)
})
.and_then(|mut file| file.flush())
})?;
Ok(())
}

async fn start_inner(
&self,
existing: &mut BTreeMap<String, RunningDefinition>,
Expand Down
20 changes: 20 additions & 0 deletions rs/ic-observability/multiservice-discovery/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ fn main() {
}
} else {
let supervisor = DefinitionsSupervisor::new(rt.handle().clone(), cli_args.start_without_mainnet);
if let Some(networks_state_file) = cli_args.networks_state_file.clone() {
rt.block_on(supervisor.load_or_create_defs(networks_state_file)).unwrap();
}

let (server_stop, server_stop_receiver) = oneshot::channel();

//Configure server
Expand Down Expand Up @@ -98,6 +102,11 @@ fn main() {
// Signal server to stop. Stop happens in parallel with supervisor stop.
server_stop.send(()).unwrap();

// Persist definitions to disk before ending the supervisor.
if let Some(networks_state_file) = cli_args.networks_state_file.clone() {
rt.block_on(supervisor.persist_defs(networks_state_file)).unwrap();
}

//Stop all definitions. End happens in parallel with server stop.
rt.block_on(supervisor.end());

Expand Down Expand Up @@ -180,4 +189,15 @@ the Prometheus targets of mainnet as a JSON structure on stdout.
"#
)]
render_prom_targets_to_stdout: bool,

#[clap(
long = "networks-state-file",
default_value = None,
action,
help = r#"
Preload networks definitions from file path. In case the file does not
exist, it will be created.
"#
)]
networks_state_file: Option<PathBuf>,
}

0 comments on commit 73ac4cf

Please sign in to comment.