Skip to content

Commit e7f8774

Browse files
thinkharderdevmpurins-coralogixBrent Gardner
authored
Cluster state refactor Part 2 (#658)
* Customize session builder * Add setter for executor slots policy * Construct Executor with functions * Add queued and completed timestamps to successful job status * Add public methods to SchedulerServer * Public method for getting execution graph * Public method for stage metrics * Use node-level local limit (#20) * Use node-level local limit * serialize limit in shuffle writer * Revert "Merge pull request #19 from coralogix/sc-5792" This reverts commit 08140ef, reversing changes made to a7f1384. * add log * make sure we don't forget limit for shuffle writer * update accum correctly and try to break early * Check local limit accumulator before polling for more data * fix build Co-authored-by: Martins Purins <[email protected]> * configure_me_codegen retroactively reserved on our `bind_host` parame… (#520) * configure_me_codegen retroactively reserved on our `bind_host` parameter name * Add label and pray * Add more labels why not * Add ClusterState trait * Refactor slightly for clarity * Revert "Use node-level local limit (#20)" This reverts commit ff96bcd. * Revert "Public method for stage metrics" This reverts commit a802315. * Revert "Public method for getting execution graph" This reverts commit 490bda5. * Revert "Add public methods to SchedulerServer" This reverts commit 5ad27c0. * Revert "Add queued and completed timestamps to successful job status" This reverts commit c615fce. * Revert "Construct Executor with functions" This reverts commit 24d4830. * Always forget the apache header * WIP * Implement JobState * Tests and fixes * do not hold ref across await point * Fix clippy warnings * Fix tomlfmt github action * uncomment test --------- Co-authored-by: Martins Purins <[email protected]> Co-authored-by: Brent Gardner <[email protected]>
1 parent 8f8154f commit e7f8774

35 files changed

+3592
-2587
lines changed

.github/workflows/rust.yml

+4-1
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,10 @@ jobs:
402402
#
403403
# ignore ./Cargo.toml because putting workspaces in multi-line lists make it easy to read
404404
ci/scripts/rust_toml_fmt.sh
405-
git diff --exit-code
405+
if test -f "./Cargo.toml.bak"; then
406+
echo "cargo tomlfmt found format violations"
407+
exit 1
408+
fi
406409
env:
407410
CARGO_HOME: "/github/home/.cargo"
408411
CARGO_TARGET_DIR: "/github/home/target"

ballista/core/proto/ballista.proto

+27-2
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ message ExecutorMetadata {
289289
ExecutorSpecification specification = 5;
290290
}
291291

292+
292293
// Used by grpc
293294
message ExecutorRegistration {
294295
string id = 1;
@@ -336,6 +337,15 @@ message ExecutorResource {
336337
}
337338
}
338339

340+
message AvailableTaskSlots {
341+
string executor_id = 1;
342+
uint32 slots = 2;
343+
}
344+
345+
message ExecutorTaskSlots {
346+
repeated AvailableTaskSlots task_slots = 1;
347+
}
348+
339349
message ExecutorData {
340350
string executor_id = 1;
341351
repeated ExecutorResourcePair resources = 2;
@@ -544,18 +554,33 @@ message GetJobStatusParams {
544554

545555
message SuccessfulJob {
546556
repeated PartitionLocation partition_location = 1;
557+
uint64 queued_at = 2;
558+
uint64 started_at = 3;
559+
uint64 ended_at = 4;
547560
}
548561

549-
message QueuedJob {}
562+
message QueuedJob {
563+
uint64 queued_at = 1;
564+
}
550565

551566
// TODO: add progress report
552-
message RunningJob {}
567+
message RunningJob {
568+
uint64 queued_at = 1;
569+
uint64 started_at = 2;
570+
string scheduler = 3;
571+
}
553572

554573
message FailedJob {
555574
string error = 1;
575+
uint64 queued_at = 2;
576+
uint64 started_at = 3;
577+
uint64 ended_at = 4;
556578
}
557579

558580
message JobStatus {
581+
string job_id = 5;
582+
string job_name = 6;
583+
559584
oneof status {
560585
QueuedJob queued = 1;
561586
RunningJob running = 2;

ballista/core/src/serde/generated/ballista.rs

+42-2
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,20 @@ pub mod executor_resource {
581581
}
582582
#[allow(clippy::derive_partial_eq_without_eq)]
583583
#[derive(Clone, PartialEq, ::prost::Message)]
584+
pub struct AvailableTaskSlots {
585+
#[prost(string, tag = "1")]
586+
pub executor_id: ::prost::alloc::string::String,
587+
#[prost(uint32, tag = "2")]
588+
pub slots: u32,
589+
}
590+
#[allow(clippy::derive_partial_eq_without_eq)]
591+
#[derive(Clone, PartialEq, ::prost::Message)]
592+
pub struct ExecutorTaskSlots {
593+
#[prost(message, repeated, tag = "1")]
594+
pub task_slots: ::prost::alloc::vec::Vec<AvailableTaskSlots>,
595+
}
596+
#[allow(clippy::derive_partial_eq_without_eq)]
597+
#[derive(Clone, PartialEq, ::prost::Message)]
584598
pub struct ExecutorData {
585599
#[prost(string, tag = "1")]
586600
pub executor_id: ::prost::alloc::string::String,
@@ -933,23 +947,49 @@ pub struct GetJobStatusParams {
933947
pub struct SuccessfulJob {
934948
#[prost(message, repeated, tag = "1")]
935949
pub partition_location: ::prost::alloc::vec::Vec<PartitionLocation>,
950+
#[prost(uint64, tag = "2")]
951+
pub queued_at: u64,
952+
#[prost(uint64, tag = "3")]
953+
pub started_at: u64,
954+
#[prost(uint64, tag = "4")]
955+
pub ended_at: u64,
936956
}
937957
#[allow(clippy::derive_partial_eq_without_eq)]
938958
#[derive(Clone, PartialEq, ::prost::Message)]
939-
pub struct QueuedJob {}
959+
pub struct QueuedJob {
960+
#[prost(uint64, tag = "1")]
961+
pub queued_at: u64,
962+
}
940963
/// TODO: add progress report
941964
#[allow(clippy::derive_partial_eq_without_eq)]
942965
#[derive(Clone, PartialEq, ::prost::Message)]
943-
pub struct RunningJob {}
966+
pub struct RunningJob {
967+
#[prost(uint64, tag = "1")]
968+
pub queued_at: u64,
969+
#[prost(uint64, tag = "2")]
970+
pub started_at: u64,
971+
#[prost(string, tag = "3")]
972+
pub scheduler: ::prost::alloc::string::String,
973+
}
944974
#[allow(clippy::derive_partial_eq_without_eq)]
945975
#[derive(Clone, PartialEq, ::prost::Message)]
946976
pub struct FailedJob {
947977
#[prost(string, tag = "1")]
948978
pub error: ::prost::alloc::string::String,
979+
#[prost(uint64, tag = "2")]
980+
pub queued_at: u64,
981+
#[prost(uint64, tag = "3")]
982+
pub started_at: u64,
983+
#[prost(uint64, tag = "4")]
984+
pub ended_at: u64,
949985
}
950986
#[allow(clippy::derive_partial_eq_without_eq)]
951987
#[derive(Clone, PartialEq, ::prost::Message)]
952988
pub struct JobStatus {
989+
#[prost(string, tag = "5")]
990+
pub job_id: ::prost::alloc::string::String,
991+
#[prost(string, tag = "6")]
992+
pub job_name: ::prost::alloc::string::String,
953993
#[prost(oneof = "job_status::Status", tags = "1, 2, 3, 4")]
954994
pub status: ::core::option::Option<job_status::Status>,
955995
}

ballista/core/src/serde/mod.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ use datafusion::execution::FunctionRegistry;
2626
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
2727
use datafusion_proto::common::proto_error;
2828
use datafusion_proto::physical_plan::from_proto::parse_protobuf_hash_partitioning;
29+
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
2930
use datafusion_proto::{
3031
convert_required,
3132
logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec},
3233
physical_plan::{AsExecutionPlan, PhysicalExtensionCodec},
3334
};
35+
3436
use prost::Message;
3537
use std::fmt::Debug;
3638
use std::marker::PhantomData;
@@ -69,16 +71,17 @@ pub fn decode_protobuf(bytes: &[u8]) -> Result<BallistaAction, BallistaError> {
6971
}
7072

7173
#[derive(Clone, Debug)]
72-
pub struct BallistaCodec<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
74+
pub struct BallistaCodec<
75+
T: 'static + AsLogicalPlan = LogicalPlanNode,
76+
U: 'static + AsExecutionPlan = PhysicalPlanNode,
77+
> {
7378
logical_extension_codec: Arc<dyn LogicalExtensionCodec>,
7479
physical_extension_codec: Arc<dyn PhysicalExtensionCodec>,
7580
logical_plan_repr: PhantomData<T>,
7681
physical_plan_repr: PhantomData<U>,
7782
}
7883

79-
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> Default
80-
for BallistaCodec<T, U>
81-
{
84+
impl Default for BallistaCodec {
8285
fn default() -> Self {
8386
Self {
8487
logical_extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),

ballista/scheduler/scheduler_config_spec.toml

+2-9
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,10 @@ doc = "Route for proxying flight results via scheduler. Should be of the form 'I
3131

3232
[[param]]
3333
abbr = "b"
34-
name = "config_backend"
35-
type = "ballista_scheduler::state::backend::StateBackend"
36-
doc = "The configuration backend for the scheduler, possible values: etcd, memory, sled. Default: sled"
37-
default = "ballista_scheduler::state::backend::StateBackend::Sled"
38-
39-
[[param]]
40-
abbr = "c"
4134
name = "cluster_backend"
42-
type = "ballista_scheduler::state::backend::StateBackend"
35+
type = "ballista_scheduler::cluster::ClusterStorage"
4336
doc = "The configuration backend for the scheduler cluster state, possible values: etcd, memory, sled. Default: sled"
44-
default = "ballista_scheduler::state::backend::StateBackend::Sled"
37+
default = "ballista_scheduler::cluster::ClusterStorage::Sled"
4538

4639
[[param]]
4740
abbr = "n"

ballista/scheduler/src/api/handlers.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ pub(crate) async fn get_query_stages<T: AsLogicalPlan, U: AsExecutionPlan>(
209209
{
210210
Ok(warp::reply::json(&QueryStagesResponse {
211211
stages: graph
212+
.as_ref()
212213
.stages()
213214
.iter()
214215
.map(|(id, stage)| {
@@ -303,7 +304,7 @@ pub(crate) async fn get_job_dot_graph<T: AsLogicalPlan, U: AsExecutionPlan>(
303304
.await
304305
.map_err(|_| warp::reject())?
305306
{
306-
ExecutionGraphDot::generate(graph).map_err(|_| warp::reject())
307+
ExecutionGraphDot::generate(graph.as_ref()).map_err(|_| warp::reject())
307308
} else {
308309
Ok("Not Found".to_string())
309310
}
@@ -322,7 +323,7 @@ pub(crate) async fn get_query_stage_dot_graph<T: AsLogicalPlan, U: AsExecutionPl
322323
.await
323324
.map_err(|_| warp::reject())?
324325
{
325-
ExecutionGraphDot::generate_for_query_stage(graph, stage_id)
326+
ExecutionGraphDot::generate_for_query_stage(graph.as_ref(), stage_id)
326327
.map_err(|_| warp::reject())
327328
} else {
328329
Ok("Not Found".to_string())

ballista/scheduler/src/bin/main.rs

+21-76
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717

1818
//! Ballista Rust scheduler binary.
1919
20-
use std::{env, io, sync::Arc};
20+
use std::{env, io};
2121

22-
use anyhow::{Context, Result};
22+
use anyhow::Result;
2323

2424
use ballista_core::print_version;
2525
use ballista_scheduler::scheduler_process::start_server;
26-
#[cfg(feature = "etcd")]
27-
use ballista_scheduler::state::backend::etcd::EtcdClient;
28-
use ballista_scheduler::state::backend::memory::MemoryBackendClient;
29-
#[cfg(feature = "sled")]
30-
use ballista_scheduler::state::backend::sled::SledClient;
31-
use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};
26+
27+
use crate::config::{Config, ResultExt};
28+
use ballista_core::config::LogRotationPolicy;
29+
use ballista_scheduler::cluster::BallistaCluster;
30+
use ballista_scheduler::config::{ClusterStorageConfig, SchedulerConfig};
31+
use tracing_subscriber::EnvFilter;
3232

3333
#[macro_use]
3434
extern crate configure_me;
@@ -43,12 +43,6 @@ mod config {
4343
));
4444
}
4545

46-
use ballista_core::config::LogRotationPolicy;
47-
use ballista_scheduler::config::SchedulerConfig;
48-
use ballista_scheduler::state::backend::cluster::DefaultClusterState;
49-
use config::prelude::*;
50-
use tracing_subscriber::EnvFilter;
51-
5246
#[tokio::main]
5347
async fn main() -> Result<()> {
5448
// parse options
@@ -61,25 +55,14 @@ async fn main() -> Result<()> {
6155
std::process::exit(0);
6256
}
6357

64-
let config_backend = init_kv_backend(&opt.config_backend, &opt).await?;
65-
66-
let cluster_state = if opt.cluster_backend == opt.config_backend {
67-
Arc::new(DefaultClusterState::new(config_backend.clone()))
68-
} else {
69-
let cluster_kv_store = init_kv_backend(&opt.cluster_backend, &opt).await?;
70-
71-
Arc::new(DefaultClusterState::new(cluster_kv_store))
72-
};
73-
7458
let special_mod_log_level = opt.log_level_setting;
75-
let namespace = opt.namespace;
76-
let external_host = opt.external_host;
77-
let bind_host = opt.bind_host;
78-
let port = opt.bind_port;
7959
let log_dir = opt.log_dir;
8060
let print_thread_info = opt.print_thread_info;
81-
let log_file_name_prefix = format!("scheduler_{namespace}_{external_host}_{port}");
82-
let scheduler_name = format!("{external_host}:{port}");
61+
62+
let log_file_name_prefix = format!(
63+
"scheduler_{}_{}_{}",
64+
opt.namespace, opt.external_host, opt.bind_port
65+
);
8366

8467
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
8568
let log_filter = EnvFilter::new(rust_log.unwrap_or(special_mod_log_level));
@@ -117,10 +100,13 @@ async fn main() -> Result<()> {
117100
.init();
118101
}
119102

120-
let addr = format!("{bind_host}:{port}");
103+
let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
121104
let addr = addr.parse()?;
122105

123106
let config = SchedulerConfig {
107+
namespace: opt.namespace,
108+
external_host: opt.external_host,
109+
bind_port: opt.bind_port,
124110
scheduling_policy: opt.scheduler_policy,
125111
event_loop_buffer_size: opt.event_loop_buffer_size,
126112
executor_slots_policy: opt.executor_slots_policy,
@@ -129,54 +115,13 @@ async fn main() -> Result<()> {
129115
finished_job_state_clean_up_interval_seconds: opt
130116
.finished_job_state_clean_up_interval_seconds,
131117
advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
118+
cluster_storage: ClusterStorageConfig::Memory,
132119
job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
133120
.then_some(opt.job_resubmit_interval_ms),
134121
};
135-
start_server(scheduler_name, config_backend, cluster_state, addr, config).await?;
136-
Ok(())
137-
}
138122

139-
async fn init_kv_backend(
140-
backend: &StateBackend,
141-
opt: &Config,
142-
) -> Result<Arc<dyn StateBackendClient>> {
143-
let cluster_backend: Arc<dyn StateBackendClient> = match backend {
144-
#[cfg(feature = "etcd")]
145-
StateBackend::Etcd => {
146-
let etcd = etcd_client::Client::connect(&[opt.etcd_urls.clone()], None)
147-
.await
148-
.context("Could not connect to etcd")?;
149-
Arc::new(EtcdClient::new(opt.namespace.clone(), etcd))
150-
}
151-
#[cfg(not(feature = "etcd"))]
152-
StateBackend::Etcd => {
153-
unimplemented!(
154-
"build the scheduler with the `etcd` feature to use the etcd config backend"
155-
)
156-
}
157-
#[cfg(feature = "sled")]
158-
StateBackend::Sled => {
159-
if opt.sled_dir.is_empty() {
160-
Arc::new(
161-
SledClient::try_new_temporary()
162-
.context("Could not create sled config backend")?,
163-
)
164-
} else {
165-
println!("{}", opt.sled_dir);
166-
Arc::new(
167-
SledClient::try_new(opt.sled_dir.clone())
168-
.context("Could not create sled config backend")?,
169-
)
170-
}
171-
}
172-
#[cfg(not(feature = "sled"))]
173-
StateBackend::Sled => {
174-
unimplemented!(
175-
"build the scheduler with the `sled` feature to use the sled config backend"
176-
)
177-
}
178-
StateBackend::Memory => Arc::new(MemoryBackendClient::new()),
179-
};
123+
let cluster = BallistaCluster::new_from_config(&config).await?;
180124

181-
Ok(cluster_backend)
125+
start_server(cluster, addr, config).await?;
126+
Ok(())
182127
}

0 commit comments

Comments
 (0)