Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aptos-workspace-server] various fixes and improvements #15571

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aptos-move/aptos-workspace-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ aptos-types = { workspace = true }
# third party deps
anyhow = { workspace = true }
bollard = { workspace = true }
clap = { workspace = true }
diesel = { workspace = true, features = [
"postgres_backend",
] }
Expand Down
35 changes: 32 additions & 3 deletions aptos-move/aptos-workspace-server/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,48 @@

use futures::{future::Shared, FutureExt};
use std::{
fmt::{self, Debug, Display},
future::Future,
net::{IpAddr, Ipv4Addr},
sync::Arc,
};

/// An wrapper to ensure propagation of chain of errors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
/// An wrapper to ensure propagation of chain of errors.
/// A wrapper to ensure propagation of chain of errors.

pub(crate) struct ArcError(Arc<anyhow::Error>);

impl Clone for ArcError {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl std::error::Error for ArcError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}

impl Display for ArcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl Debug for ArcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}

/// The local IP address services are bound to.
pub(crate) const IP_LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

/// Converts a future into a shared future by wrapping the error in an `Arc`.
pub(crate) fn make_shared<F, T, E>(fut: F) -> Shared<impl Future<Output = Result<T, Arc<E>>>>
pub(crate) fn make_shared<F, T>(fut: F) -> Shared<impl Future<Output = Result<T, ArcError>>>
where
T: Clone,
F: Future<Output = Result<T, E>>,
F: Future<Output = Result<T, anyhow::Error>>,
{
fut.map(|r| r.map_err(|err| Arc::new(err))).shared()
fut.map(|r| r.map_err(|err| ArcError(Arc::new(err))))
.shared()
}
60 changes: 46 additions & 14 deletions aptos-move/aptos-workspace-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@
mod common;
mod services;

use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use aptos_localnet::docker::get_docker;
use clap::Parser;
use common::make_shared;
use futures::TryFutureExt;
use services::{
docker_common::create_docker_network, indexer_api::start_indexer_api,
docker_common::create_docker_network_permanent, indexer_api::start_indexer_api,
processors::start_all_processors,
};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

pub async fn run_all_services() -> Result<()> {
async fn run_all_services(timeout: u64) -> Result<()> {
let test_dir = tempfile::tempdir()?;
let test_dir = test_dir.path();
println!("Created test directory: {}", test_dir.display());
Expand All @@ -50,9 +53,15 @@ pub async fn run_all_services() -> Result<()> {
// waiting for it to trigger.
let shutdown = shutdown.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();

println!("\nCtrl-C received. Shutting down services. This may take a while.\n");
tokio::select! {
res = tokio::signal::ctrl_c() => {
res.unwrap();
println!("\nCtrl-C received. Shutting down services. This may take a while.\n");
}
_ = tokio::time::sleep(Duration::from_secs(timeout)) => {
println!("\nTimeout reached. Shutting down services. This may take a while.\n");
}
}

shutdown.cancel();
});
Expand All @@ -72,15 +81,22 @@ pub async fn run_all_services() -> Result<()> {
fut_indexer_grpc.clone(),
);

// Docker
let fut_docker = make_shared(get_docker());

// Docker Network
let docker_network_name = format!("aptos-workspace-{}", instance_id);
let (fut_docker_network, fut_docker_network_clean_up) =
create_docker_network(shutdown.clone(), docker_network_name);
let docker_network_name = "aptos-workspace".to_string();
let fut_docker_network = make_shared(create_docker_network_permanent(
shutdown.clone(),
fut_docker.clone(),
docker_network_name,
));

// Indexer part 1: postgres db
let (fut_postgres, fut_postgres_finish, fut_postgres_clean_up) =
services::postgres::start_postgres(
shutdown.clone(),
fut_docker.clone(),
fut_docker_network.clone(),
instance_id,
);
Expand All @@ -98,6 +114,7 @@ pub async fn run_all_services() -> Result<()> {
let (fut_indexer_api, fut_indexer_api_finish, fut_indexer_api_clean_up) = start_indexer_api(
instance_id,
shutdown.clone(),
fut_docker.clone(),
fut_docker_network.clone(),
fut_postgres.clone(),
fut_all_processors_ready.clone(),
Expand All @@ -106,19 +123,18 @@ pub async fn run_all_services() -> Result<()> {
// Phase 2: Wait for all services to be up.
let all_services_up = async move {
tokio::try_join!(
fut_node_api.map_err(anyhow::Error::msg),
fut_indexer_grpc.map_err(anyhow::Error::msg),
fut_node_api.map_err(|err| anyhow!(err)),
fut_indexer_grpc.map_err(|err| anyhow!(err)),
fut_faucet,
fut_postgres.map_err(anyhow::Error::msg),
fut_all_processors_ready.map_err(anyhow::Error::msg),
fut_postgres.map_err(|err| anyhow!(err)),
fut_all_processors_ready.map_err(|err| anyhow!(err)),
fut_indexer_api,
)
};
let clean_up_all = async move {
eprintln!("Running shutdown steps");
fut_indexer_api_clean_up.await;
fut_postgres_clean_up.await;
fut_docker_network_clean_up.await;
};
tokio::select! {
_ = shutdown.cancelled() => {
Expand Down Expand Up @@ -181,3 +197,19 @@ pub async fn run_all_services() -> Result<()> {

Ok(())
}

#[derive(Parser)]
pub enum WorkspaceCommand {
Run {
#[arg(long, default_value_t = 1800)]
timeout: u64,
Comment on lines +204 to +205
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment and maybe better naming could be nice here. My first thought was that this would be a startup or shutdown timeout, but it really seems like a "max lifespan".

I don't fully know exactly how you use workspace, but I suppose there is a good reason that we always do this, vs making it optional?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it'd be good to suffix whatever name we choose with _secs

},
}

impl WorkspaceCommand {
pub async fn run(self) -> Result<()> {
match self {
WorkspaceCommand::Run { timeout } => run_all_services(timeout).await,
}
}
}
7 changes: 4 additions & 3 deletions aptos-move/aptos-workspace-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use clap::Parser;

#[tokio::main]
async fn main() -> Result<()> {
aptos_workspace_server::run_all_services().await?;

Ok(())
aptos_workspace_server::WorkspaceCommand::parse()
.run()
.await
}
Loading
Loading