Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic support for enhancement #319

Merged
merged 2 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.8.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
futures = "0.3"
Expand Down
20 changes: 10 additions & 10 deletions ballista/rust/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::convert::TryInto;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::mpsc;

use log::{debug, error, info, warn};
use tonic::transport::Channel;
Expand All @@ -45,6 +45,7 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use ballista_core::utils::{
collect_plan_metrics, create_grpc_client_connection, create_grpc_server,
};
use dashmap::DashMap;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
Expand All @@ -57,7 +58,7 @@ use crate::shutdown::ShutdownNotifier;
use crate::{as_task_status, TaskExecutionTimes};

type ServerHandle = JoinHandle<Result<(), BallistaError>>;
type SchedulerClients = Arc<RwLock<HashMap<String, SchedulerGrpcClient<Channel>>>>;
type SchedulerClients = Arc<DashMap<String, SchedulerGrpcClient<Channel>>>;

/// Wrap TaskDefinition with its curator scheduler id for task update to its specific curator scheduler later
#[derive(Debug)]
Expand Down Expand Up @@ -216,10 +217,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
&self,
scheduler_id: &str,
) -> Result<SchedulerGrpcClient<Channel>, BallistaError> {
let scheduler = {
let schedulers = self.schedulers.read().await;
schedulers.get(scheduler_id).cloned()
};
let scheduler = self.schedulers.get(scheduler_id).map(|value| value.clone());
// If channel does not exist, create a new one
if let Some(scheduler) = scheduler {
Ok(scheduler)
Expand All @@ -229,8 +227,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
let scheduler = SchedulerGrpcClient::new(connection);

{
let mut schedulers = self.schedulers.write().await;
schedulers.insert(scheduler_id.to_owned(), scheduler.clone());
self.schedulers
.insert(scheduler_id.to_owned(), scheduler.clone());
}

Ok(scheduler)
Expand Down Expand Up @@ -263,8 +261,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
}
};

let schedulers = self.schedulers.read().await.clone();
for (scheduler_id, mut scheduler) in schedulers {
for mut item in self.schedulers.iter_mut() {
let scheduler_id = item.key().clone();
let scheduler = item.value_mut();

match scheduler
.heart_beat_from_executor(heartbeat_params.clone())
.await
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ballista-core = { path = "../core", version = "0.8.0" }
base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
etcd-client = { version = "0.9", optional = true }
Expand Down
24 changes: 14 additions & 10 deletions ballista/rust/scheduler/src/state/backend/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use etcd_client::{
use futures::{Stream, StreamExt};
use log::{debug, error, warn};

use crate::state::backend::{Keyspace, Lock, StateBackendClient, Watch, WatchEvent};
use crate::state::backend::{
Keyspace, Lock, Operation, StateBackendClient, Watch, WatchEvent,
};

/// A [`StateBackendClient`] implementation that uses etcd to save cluster configuration.
#[derive(Clone)]
Expand Down Expand Up @@ -137,29 +139,31 @@ impl StateBackendClient for EtcdClient {
.await
.map_err(|e| {
warn!("etcd put failed: {}", e);
ballista_error("etcd put failed")
ballista_error(&*format!("etcd put failed: {}", e))
})
.map(|_| ())
}

async fn put_txn(&self, ops: Vec<(Keyspace, String, Vec<u8>)>) -> Result<()> {
/// Apply multiple operations in a single transaction.
async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) -> Result<()> {
let mut etcd = self.etcd.clone();

let txn_ops: Vec<TxnOp> = ops
.into_iter()
.map(|(ks, key, value)| {
.map(|(operation, ks, key)| {
let key = format!("/{}/{:?}/{}", self.namespace, ks, key);
TxnOp::put(key, value, None)
match operation {
Operation::Put(value) => TxnOp::put(key, value, None),
Operation::Delete => TxnOp::delete(key, None),
}
})
.collect();

let txn = Txn::new().and_then(txn_ops);

etcd.txn(txn)
etcd.txn(Txn::new().and_then(txn_ops))
.await
.map_err(|e| {
error!("etcd put failed: {}", e);
ballista_error("etcd transaction put failed")
error!("etcd operation failed: {}", e);
ballista_error(&*format!("etcd operation failed: {}", e))
})
.map(|_| ())
}
Expand Down
23 changes: 20 additions & 3 deletions ballista/rust/scheduler/src/state/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use ballista_core::error::Result;
use clap::ArgEnum;
use futures::Stream;
use futures::{future, Stream};
use std::collections::HashSet;
use std::fmt;
use tokio::sync::OwnedMutexGuard;
Expand Down Expand Up @@ -60,6 +60,12 @@ pub enum Keyspace {
Heartbeats,
}

#[derive(Debug, Eq, PartialEq, Hash)]
pub enum Operation {
Put(Vec<u8>),
Delete,
}

/// A trait that contains the necessary methods to save and retrieve the state and configuration of a cluster.
#[tonic::async_trait]
pub trait StateBackendClient: Send + Sync {
Expand Down Expand Up @@ -90,8 +96,19 @@ pub trait StateBackendClient: Send + Sync {
/// Saves the value into the provided key, overriding any previous data that might have been associated to that key.
async fn put(&self, keyspace: Keyspace, key: String, value: Vec<u8>) -> Result<()>;

/// Save multiple values in a single transaction. Either all values should be saved, or all should fail
async fn put_txn(&self, ops: Vec<(Keyspace, String, Vec<u8>)>) -> Result<()>;
/// Bundle multiple operation in a single transaction. Either all values should be saved, or all should fail.
/// It can support multiple types of operations and keyspaces. If the count of the unique keyspace is more than one,
/// more than one locks has to be acquired.
async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) -> Result<()>;
/// Acquire mutex with specified IDs.
async fn acquire_locks(
&self,
mut ids: Vec<(Keyspace, &str)>,
) -> Result<Vec<Box<dyn Lock>>> {
// We always acquire locks in a specific order to avoid deadlocks.
ids.sort_by_key(|n| format!("/{:?}/{}", n.0, n.1));
future::try_join_all(ids.into_iter().map(|(ks, key)| self.lock(ks, key))).await
}

/// Atomically move the given key from one keyspace to another
async fn mv(
Expand Down
48 changes: 41 additions & 7 deletions ballista/rust/scheduler/src/state/backend/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use log::warn;
use sled_package as sled;
use tokio::sync::Mutex;

use crate::state::backend::{Keyspace, Lock, StateBackendClient, Watch, WatchEvent};
use crate::state::backend::{
Keyspace, Lock, Operation, StateBackendClient, Watch, WatchEvent,
};

/// A [`StateBackendClient`] implementation that uses file-based storage to save cluster configuration.
#[derive(Clone)]
Expand Down Expand Up @@ -162,17 +164,20 @@ impl StateBackendClient for StandaloneClient {
.map(|_| ())
}

async fn put_txn(&self, ops: Vec<(Keyspace, String, Vec<u8>)>) -> Result<()> {
async fn apply_txn(&self, ops: Vec<(Operation, Keyspace, String)>) -> Result<()> {
let mut batch = sled::Batch::default();

for (ks, key, value) in ops {
let key = format!("/{:?}/{}", ks, key);
batch.insert(key.as_str(), value);
for (op, keyspace, key_str) in ops {
let key = format!("/{:?}/{}", &keyspace, key_str);
match op {
Operation::Put(value) => batch.insert(key.as_str(), value),
Operation::Delete => batch.remove(key.as_str()),
}
}

self.db.apply_batch(batch).map_err(|e| {
warn!("sled transaction insert failed: {}", e);
ballista_error("sled insert failed")
ballista_error("sled operations failed")
})
}

Expand Down Expand Up @@ -279,7 +284,8 @@ impl Stream for SledWatch {
mod tests {
use super::{StandaloneClient, StateBackendClient, Watch, WatchEvent};

use crate::state::backend::Keyspace;
use crate::state::backend::{Keyspace, Operation};
use crate::state::with_locks;
use futures::StreamExt;
use std::result::Result;

Expand All @@ -299,6 +305,34 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn multiple_operation() -> Result<(), Box<dyn std::error::Error>> {
let client = create_instance()?;
let key = "key".to_string();
let value = "value".as_bytes().to_vec();
let locks = client
.acquire_locks(vec![(Keyspace::ActiveJobs, ""), (Keyspace::Slots, "")])
.await?;

let _r: ballista_core::error::Result<()> = with_locks(locks, async {
let txn_ops = vec![
(Operation::Put(value.clone()), Keyspace::Slots, key.clone()),
(
Operation::Put(value.clone()),
Keyspace::ActiveJobs,
key.clone(),
),
];
client.apply_txn(txn_ops).await?;
Ok(())
})
.await;

assert_eq!(client.get(Keyspace::Slots, key.as_str()).await?, value);
assert_eq!(client.get(Keyspace::ActiveJobs, key.as_str()).await?, value);
Ok(())
}

#[tokio::test]
async fn read_empty() -> Result<(), Box<dyn std::error::Error>> {
let client = create_instance()?;
Expand Down
Loading