Skip to content

Commit b5fa872

Browse files
Provide a memory StateBackendClient (#523)
* Rename StateBackend::Standalone to StateBackend:Sled * Copy utility files from sled crate since they cannot be used directly * Provide a memory StateBackendClient * Fix dashmap deadlock issue * Fix for the comments Co-authored-by: yangzhong <[email protected]>
1 parent 20087c1 commit b5fa872

15 files changed

+906
-48
lines changed

ballista/scheduler/scheduler_config_spec.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ doc = "Route for proxying flight results via scheduler. Should be of the form 'I
3333
abbr = "b"
3434
name = "config_backend"
3535
type = "ballista_scheduler::state::backend::StateBackend"
36-
doc = "The configuration backend for the scheduler, possible values: etcd, standalone. Default: standalone"
37-
default = "ballista_scheduler::state::backend::StateBackend::Standalone"
36+
doc = "The configuration backend for the scheduler, possible values: etcd, memory, sled. Default: sled"
37+
default = "ballista_scheduler::state::backend::StateBackend::Sled"
3838

3939
[[param]]
4040
abbr = "n"

ballista/scheduler/src/main.rs

+10-12
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use anyhow::{Context, Result};
2121
#[cfg(feature = "flight-sql")]
2222
use arrow_flight::flight_service_server::FlightServiceServer;
2323
use ballista_scheduler::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
24+
use ballista_scheduler::state::backend::memory::MemoryBackendClient;
2425
use futures::future::{self, Either, TryFutureExt};
2526
use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
2627
use std::convert::Infallible;
@@ -37,7 +38,7 @@ use ballista_scheduler::api::{get_routes, EitherBody, Error};
3738
#[cfg(feature = "etcd")]
3839
use ballista_scheduler::state::backend::etcd::EtcdClient;
3940
#[cfg(feature = "sled")]
40-
use ballista_scheduler::state::backend::standalone::StandaloneClient;
41+
use ballista_scheduler::state::backend::sled::SledClient;
4142
use datafusion_proto::protobuf::LogicalPlanNode;
4243

4344
use ballista_scheduler::scheduler_server::SchedulerServer;
@@ -211,10 +212,6 @@ async fn main() -> Result<()> {
211212
let addr = addr.parse()?;
212213

213214
let config_backend: Arc<dyn StateBackendClient> = match opt.config_backend {
214-
#[cfg(not(any(feature = "sled", feature = "etcd")))]
215-
_ => std::compile_error!(
216-
"To build the scheduler enable at least one config backend feature (`etcd` or `sled`)"
217-
),
218215
#[cfg(feature = "etcd")]
219216
StateBackend::Etcd => {
220217
let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None)
@@ -229,26 +226,27 @@ async fn main() -> Result<()> {
229226
)
230227
}
231228
#[cfg(feature = "sled")]
232-
StateBackend::Standalone => {
229+
StateBackend::Sled => {
233230
if opt.sled_dir.is_empty() {
234231
Arc::new(
235-
StandaloneClient::try_new_temporary()
236-
.context("Could not create standalone config backend")?,
232+
SledClient::try_new_temporary()
233+
.context("Could not create sled config backend")?,
237234
)
238235
} else {
239236
println!("{}", opt.sled_dir);
240237
Arc::new(
241-
StandaloneClient::try_new(opt.sled_dir)
242-
.context("Could not create standalone config backend")?,
238+
SledClient::try_new(opt.sled_dir)
239+
.context("Could not create sled config backend")?,
243240
)
244241
}
245242
}
246243
#[cfg(not(feature = "sled"))]
247-
StateBackend::Standalone => {
244+
StateBackend::Sled => {
248245
unimplemented!(
249-
"build the scheduler with the `sled` feature to use the standalone config backend"
246+
"build the scheduler with the `sled` feature to use the sled config backend"
250247
)
251248
}
249+
StateBackend::Memory => Arc::new(MemoryBackendClient::new()),
252250
};
253251

254252
let config = SchedulerConfig {

ballista/scheduler/src/scheduler_server/grpc.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -587,13 +587,13 @@ mod test {
587587
use ballista_core::utils::default_session_builder;
588588

589589
use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
590-
use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
590+
use crate::state::{backend::sled::SledClient, SchedulerState};
591591

592592
use super::{SchedulerGrpc, SchedulerServer};
593593

594594
#[tokio::test]
595595
async fn test_poll_work() -> Result<(), BallistaError> {
596-
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
596+
let state_storage = Arc::new(SledClient::try_new_temporary()?);
597597
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
598598
SchedulerServer::new(
599599
"localhost:50050".to_owned(),
@@ -680,7 +680,7 @@ mod test {
680680

681681
#[tokio::test]
682682
async fn test_stop_executor() -> Result<(), BallistaError> {
683-
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
683+
let state_storage = Arc::new(SledClient::try_new_temporary()?);
684684
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
685685
SchedulerServer::new(
686686
"localhost:50050".to_owned(),
@@ -761,7 +761,7 @@ mod test {
761761
#[tokio::test]
762762
#[ignore]
763763
async fn test_expired_executor() -> Result<(), BallistaError> {
764-
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
764+
let state_storage = Arc::new(SledClient::try_new_temporary()?);
765765
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
766766
SchedulerServer::new(
767767
"localhost:50050".to_owned(),

ballista/scheduler/src/scheduler_server/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ mod test {
330330
use ballista_core::serde::BallistaCodec;
331331

332332
use crate::scheduler_server::{timestamp_millis, SchedulerServer};
333-
use crate::state::backend::standalone::StandaloneClient;
333+
use crate::state::backend::sled::SledClient;
334334

335335
use crate::test_utils::{
336336
assert_completed_event, assert_failed_event, assert_no_submitted_event,
@@ -598,7 +598,7 @@ mod test {
598598
async fn test_scheduler(
599599
scheduling_policy: TaskSchedulingPolicy,
600600
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
601-
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
601+
let state_storage = Arc::new(SledClient::try_new_temporary()?);
602602
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
603603
SchedulerServer::new(
604604
"localhost:50050".to_owned(),

ballista/scheduler/src/standalone.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
use crate::config::SchedulerConfig;
1919
use crate::metrics::default_metrics_collector;
20-
use crate::{
21-
scheduler_server::SchedulerServer, state::backend::standalone::StandaloneClient,
22-
};
20+
use crate::{scheduler_server::SchedulerServer, state::backend::sled::SledClient};
2321
use ballista_core::serde::protobuf::PhysicalPlanNode;
2422
use ballista_core::serde::BallistaCodec;
2523
use ballista_core::utils::create_grpc_server;
@@ -33,7 +31,7 @@ use std::{net::SocketAddr, sync::Arc};
3331
use tokio::net::TcpListener;
3432

3533
pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
36-
let client = StandaloneClient::try_new_temporary()?;
34+
let client = SledClient::try_new_temporary()?;
3735

3836
let metrics_collector = default_metrics_collector()?;
3937

ballista/scheduler/src/state/backend/etcd.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::state::backend::{
3434
Keyspace, Lock, Operation, StateBackendClient, Watch, WatchEvent,
3535
};
3636

37-
/// A [`StateBackendClient`] implementation that uses etcd to save cluster configuration.
37+
/// A [`StateBackendClient`] implementation that uses etcd to save cluster state.
3838
#[derive(Clone)]
3939
pub struct EtcdClient {
4040
namespace: String,

0 commit comments

Comments
 (0)