Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster state refactor part 1 #560

Merged
merged 20 commits into from
Jan 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b606c77
Customize session builder
thinkharderdev Nov 14, 2022
a157581
Add setter for executor slots policy
thinkharderdev Nov 14, 2022
24d4830
Construct Executor with functions
thinkharderdev Nov 14, 2022
c615fce
Add queued and completed timestamps to successful job status
thinkharderdev Nov 14, 2022
5ad27c0
Add public methods to SchedulerServer
thinkharderdev Nov 15, 2022
490bda5
Public method for getting execution graph
thinkharderdev Nov 15, 2022
a802315
Public method for stage metrics
thinkharderdev Nov 15, 2022
ff96bcd
Use node-level local limit (#20)
thinkharderdev Oct 24, 2022
694f6e2
configure_me_codegen retroactively reserved on our `bind_host` parame…
Nov 17, 2022
91119e4
Add ClusterState trait
thinkharderdev Dec 9, 2022
18790f4
Merge remote-tracking branch 'cgx/cluster-state' into cluster-state-r…
thinkharderdev Dec 9, 2022
41f228c
Refactor slightly for clarity
thinkharderdev Dec 9, 2022
70e1bcf
Revert "Use node-level local limit (#20)"
thinkharderdev Dec 9, 2022
96a8c9d
Revert "Public method for stage metrics"
thinkharderdev Dec 9, 2022
081b224
Revert "Public method for getting execution graph"
thinkharderdev Dec 9, 2022
7ae9aaa
Revert "Add public methods to SchedulerServer"
thinkharderdev Dec 9, 2022
d34ecb5
Revert "Add queued and completed timestamps to successful job status"
thinkharderdev Dec 9, 2022
ee2c9d0
Revert "Construct Executor with functions"
thinkharderdev Dec 9, 2022
3948c81
Always forget the apache header
thinkharderdev Dec 9, 2022
7062743
Merge remote-tracking branch 'origin' into cluster-state-refactor
thinkharderdev Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,7 @@ pub mod executor_grpc_client {
pub mod scheduler_grpc_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with SchedulerGrpcServer.
///Generated trait containing gRPC methods that should be implemented for use with SchedulerGrpcServer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to remove the blank after the ///

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not sure why it was removed in the generated code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remind me again why the generated code is checked in? Where did we land on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was so the project played nicely with IDEs (some of which do not evaluate proc macros)

#[async_trait]
pub trait SchedulerGrpc: Send + Sync + 'static {
/// Executors must poll the scheduler for heartbeat and to receive tasks
Expand Down Expand Up @@ -2531,7 +2531,7 @@ pub mod scheduler_grpc_server {
pub mod executor_grpc_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with ExecutorGrpcServer.
///Generated trait containing gRPC methods that should be implemented for use with ExecutorGrpcServer.
#[async_trait]
pub trait ExecutorGrpc: Send + Sync + 'static {
async fn launch_task(
Expand Down
7 changes: 7 additions & 0 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type = "ballista_scheduler::state::backend::StateBackend"
doc = "The configuration backend for the scheduler, possible values: etcd, memory, sled. Default: sled"
default = "ballista_scheduler::state::backend::StateBackend::Sled"

[[param]]
abbr = "c"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need both config_backend and cluster_backend? Personally, I prefer cluster_backend

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, no. But once part 2 is done, the goal is to be able to have different backends for cluster state and config (eg job) state. But maybe we expose that in the second PR so it is not so confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yahoNanJing are you okay to merge this as is?

name = "cluster_backend"
type = "ballista_scheduler::state::backend::StateBackend"
doc = "The configuration backend for the scheduler cluster state, possible values: etcd, memory, sled. Default: sled"
default = "ballista_scheduler::state::backend::StateBackend::Sled"

[[param]]
abbr = "n"
name = "namespace"
Expand Down
50 changes: 34 additions & 16 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod config {

use ballista_core::config::LogRotationPolicy;
use ballista_scheduler::config::SchedulerConfig;
use ballista_scheduler::state::backend::cluster::DefaultClusterState;
use config::prelude::*;
use tracing_subscriber::EnvFilter;

Expand All @@ -60,6 +61,16 @@ async fn main() -> Result<()> {
std::process::exit(0);
}

let config_backend = init_kv_backend(&opt.config_backend, &opt).await?;

let cluster_state = if opt.cluster_backend == opt.config_backend {
Arc::new(DefaultClusterState::new(config_backend.clone()))
} else {
let cluster_kv_store = init_kv_backend(&opt.cluster_backend, &opt).await?;

Arc::new(DefaultClusterState::new(cluster_kv_store))
};

let special_mod_log_level = opt.log_level_setting;
let namespace = opt.namespace;
let external_host = opt.external_host;
Expand Down Expand Up @@ -110,13 +121,31 @@ async fn main() -> Result<()> {
let addr = format!("{}:{}", bind_host, port);
let addr = addr.parse()?;

let config_backend: Arc<dyn StateBackendClient> = match opt.config_backend {
let config = SchedulerConfig {
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
executor_slots_policy: opt.executor_slots_policy,
finished_job_data_clean_up_interval_seconds: opt
.finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds: opt
.finished_job_state_clean_up_interval_seconds,
advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
};
start_server(scheduler_name, config_backend, cluster_state, addr, config).await?;
Ok(())
}

async fn init_kv_backend(
backend: &StateBackend,
opt: &Config,
) -> Result<Arc<dyn StateBackendClient>> {
let cluster_backend: Arc<dyn StateBackendClient> = match backend {
#[cfg(feature = "etcd")]
StateBackend::Etcd => {
let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None)
let etcd = etcd_client::Client::connect(&[opt.etcd_urls.clone()], None)
.await
.context("Could not connect to etcd")?;
Arc::new(EtcdClient::new(namespace.clone(), etcd))
Arc::new(EtcdClient::new(opt.namespace.clone(), etcd))
}
#[cfg(not(feature = "etcd"))]
StateBackend::Etcd => {
Expand All @@ -134,7 +163,7 @@ async fn main() -> Result<()> {
} else {
println!("{}", opt.sled_dir);
Arc::new(
SledClient::try_new(opt.sled_dir)
SledClient::try_new(opt.sled_dir.clone())
.context("Could not create sled config backend")?,
)
}
Expand All @@ -148,16 +177,5 @@ async fn main() -> Result<()> {
StateBackend::Memory => Arc::new(MemoryBackendClient::new()),
};

let config = SchedulerConfig {
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
executor_slots_policy: opt.executor_slots_policy,
finished_job_data_clean_up_interval_seconds: opt
.finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds: opt
.finished_job_state_clean_up_interval_seconds,
advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
};
start_server(scheduler_name, config_backend, addr, config).await?;
Ok(())
Ok(cluster_backend)
}
5 changes: 5 additions & 0 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ impl SchedulerConfig {
self.advertise_flight_sql_endpoint = endpoint;
self
}

pub fn with_executor_slots_policy(mut self, policy: SlotsPolicy) -> Self {
self.executor_slots_policy = policy;
self
}
}

// an enum used to configure the executor slots policy
Expand Down
3 changes: 3 additions & 0 deletions ballista/scheduler/src/scheduler_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ use crate::flight_sql::FlightSqlServiceImpl;
use crate::metrics::default_metrics_collector;
use crate::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
use crate::scheduler_server::SchedulerServer;
use crate::state::backend::cluster::ClusterState;
use crate::state::backend::StateBackendClient;

pub async fn start_server(
scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
cluster_state: Arc<dyn ClusterState>,
addr: SocketAddr,
config: SchedulerConfig,
) -> Result<()> {
Expand All @@ -65,6 +67,7 @@ pub async fn start_server(
SchedulerServer::new(
scheduler_name,
config_backend.clone(),
cluster_state,
BallistaCodec::default(),
config,
metrics_collector,
Expand Down
13 changes: 11 additions & 2 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ mod test {
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::default_session_builder;

use crate::state::backend::cluster::DefaultClusterState;
use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
use crate::state::{backend::sled::SledClient, SchedulerState};

Expand All @@ -596,10 +597,12 @@ mod test {
#[tokio::test]
async fn test_poll_work() -> Result<(), BallistaError> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
let cluster_state = Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
state_storage.clone(),
cluster_state.clone(),
BallistaCodec::default(),
SchedulerConfig::default(),
default_metrics_collector().unwrap(),
Expand Down Expand Up @@ -627,6 +630,7 @@ mod test {
let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
SchedulerState::new_with_default_scheduler_name(
state_storage.clone(),
cluster_state.clone(),
default_session_builder,
BallistaCodec::default(),
);
Expand Down Expand Up @@ -660,6 +664,7 @@ mod test {
let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
SchedulerState::new_with_default_scheduler_name(
state_storage.clone(),
cluster_state,
default_session_builder,
BallistaCodec::default(),
);
Expand All @@ -683,10 +688,12 @@ mod test {
#[tokio::test]
async fn test_stop_executor() -> Result<(), BallistaError> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
let cluster_state = Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
state_storage.clone(),
state_storage,
cluster_state,
BallistaCodec::default(),
SchedulerConfig::default(),
default_metrics_collector().unwrap(),
Expand Down Expand Up @@ -764,10 +771,12 @@ mod test {
#[ignore]
async fn test_expired_executor() -> Result<(), BallistaError> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
let cluster_state = Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
state_storage.clone(),
state_storage,
cluster_state,
BallistaCodec::default(),
SchedulerConfig::default(),
default_metrics_collector().unwrap(),
Expand Down
44 changes: 43 additions & 1 deletion ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use log::{error, warn};

use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
use crate::state::backend::cluster::ClusterState;
use crate::state::backend::StateBackendClient;
use crate::state::executor_manager::{
ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS,
Expand Down Expand Up @@ -69,12 +70,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
pub fn new(
scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
cluster_state: Arc<dyn ClusterState>,
codec: BallistaCodec<T, U>,
config: SchedulerConfig,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
) -> Self {
let state = Arc::new(SchedulerState::new(
config_backend,
cluster_state,
default_session_builder,
codec,
scheduler_name.clone(),
Expand All @@ -97,17 +100,53 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
}
}

pub fn with_session_builder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not part of this PR, but I think it's wildly important we switch to struct update syntax or a builder pattern to avoid the combinatorial explosion of new_with_xxx_and_yyy() methods which are a constant source of conflicts for me.

I did an issue search, I think it's covered by #479 ?

scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
cluster_backend: Arc<dyn ClusterState>,
codec: BallistaCodec<T, U>,
config: SchedulerConfig,
session_builder: SessionBuilder,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
) -> Self {
let state = Arc::new(SchedulerState::new(
config_backend,
cluster_backend,
session_builder,
codec,
scheduler_name.clone(),
config.clone(),
));
let query_stage_scheduler =
Arc::new(QueryStageScheduler::new(state.clone(), metrics_collector));
let query_stage_event_loop = EventLoop::new(
"query_stage".to_owned(),
config.event_loop_buffer_size as usize,
query_stage_scheduler.clone(),
);

Self {
scheduler_name,
start_time: timestamp_millis() as u128,
state,
query_stage_event_loop,
query_stage_scheduler,
}
}

#[allow(dead_code)]
pub(crate) fn with_task_launcher(
scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
cluster_backend: Arc<dyn ClusterState>,
codec: BallistaCodec<T, U>,
config: SchedulerConfig,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
task_launcher: Arc<dyn TaskLauncher>,
) -> Self {
let state = Arc::new(SchedulerState::with_task_launcher(
config_backend,
cluster_backend,
default_session_builder,
codec,
scheduler_name.clone(),
Expand Down Expand Up @@ -330,6 +369,7 @@ mod test {
use ballista_core::serde::BallistaCodec;

use crate::scheduler_server::{timestamp_millis, SchedulerServer};
use crate::state::backend::cluster::DefaultClusterState;
use crate::state::backend::sled::SledClient;

use crate::test_utils::{
Expand Down Expand Up @@ -599,10 +639,12 @@ mod test {
scheduling_policy: TaskSchedulingPolicy,
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
let cluster_state = Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
state_storage.clone(),
state_storage,
cluster_state,
BallistaCodec::default(),
SchedulerConfig::default().with_scheduler_policy(scheduling_policy),
Arc::new(TestMetricsCollector::default()),
Expand Down
6 changes: 4 additions & 2 deletions ballista/scheduler/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::config::SchedulerConfig;
use crate::metrics::default_metrics_collector;
use crate::state::backend::cluster::DefaultClusterState;
use crate::{scheduler_server::SchedulerServer, state::backend::sled::SledClient};
use ballista_core::serde::protobuf::PhysicalPlanNode;
use ballista_core::serde::BallistaCodec;
Expand All @@ -31,14 +32,15 @@ use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpListener;

pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
let client = SledClient::try_new_temporary()?;
let backend = Arc::new(SledClient::try_new_temporary()?);

let metrics_collector = default_metrics_collector()?;

let mut scheduler_server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
Arc::new(client),
backend.clone(),
Arc::new(DefaultClusterState::new(backend)),
BallistaCodec::default(),
SchedulerConfig::default(),
metrics_collector,
Expand Down
Loading