Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable periodic cleanup of work_dir directories in ballista executor #1783

Merged
merged 11 commits into from
Mar 8, 2022
1 change: 1 addition & 0 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
hyper = "0.14.4"
parking_lot = "0.12"
chrono = { version = "0.4", default-features = false }

[dev-dependencies]

Expand Down
18 changes: 18 additions & 0 deletions ballista/rust/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,21 @@ name = "task_scheduling_policy"
type = "ballista_core::config::TaskSchedulingPolicy"
doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged"
default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"

[[param]]
name = "executor_cleanup_enable"
type = "bool"
doc = "Enable periodic cleanup of work_dir directories."
default = "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the default value to false and open it after a long time of testing?
@Ted-Jiang
If the default is false, this feature will not impact the current status. Users who want to use this feature can open it.
Do you agree? @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a good idea to put this feature in turned off by default so it can be tested more thoroughly


[[param]]
name = "executor_cleanup_interval"
type = "u64"
doc = "Controls the interval in seconds , which the worker cleans up old job dirs on the local machine."
default = "1800"

[[param]]
name = "executor_cleanup_ttl"
type = "i64"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to use u64

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks fix !

doc = "The number of seconds to retain job directories on each worker 604800 (7 days, 7 * 24 * 3600), In other words, after job done, how long the resulting data is retained"
default = "604800"
129 changes: 128 additions & 1 deletion ballista/rust/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

//! Ballista Rust executor binary.

use chrono::{DateTime, Duration, Utc};
use std::sync::Arc;
use std::time::Duration as Core_Duration;

use anyhow::{Context, Result};
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_executor::{execution_loop, executor_server};
use log::info;
use log::{error, info};
use tempfile::TempDir;
use tokio::fs::ReadDir;
use tokio::{fs, time};
use tonic::transport::Server;
use uuid::Uuid;

Expand Down Expand Up @@ -112,6 +116,21 @@ async fn main() -> Result<()> {
.context("Could not connect to scheduler")?;

let scheduler_policy = opt.task_scheduling_policy;
let cleanup_ttl = opt.executor_cleanup_ttl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might be possible to use NamedTempFile or some other struct from tempfile (already a dependency): https://crates.io/crates/tempfile

There are at least two benefits:

  1. The files are dropped immediately after they are no longer used so required intermediate diskspace is minimized
  2. They can't be accidentally dropped while still in use (which perhaps affects long running queries)

I did something similar in DataFusion here: #1680

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb Thanks for your advice 😊!
IMHP, if one job has 3 stage, stage2 read stage1 input then delete the file, but stage2 task fail,
In ballista, scheduler will start a task to reload stage1 input. I think using NamedTempFile will cause some trouble and complexity.
we need keep the file for task-recovery and stage retry (like spark). So i decide if all the files under job_dir not modified in TTL we can safely delete it.
If i am not right, Please correct me 🙈

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't honestly know enough about Ballista and its executor to know.

What do you think @yahoNanJing and @liukun4515 ?


if opt.executor_cleanup_enable {
let interval = opt.executor_cleanup_interval;
let mut interval_time = time::interval(Core_Duration::from_secs(interval));
tokio::spawn(async move {
loop {
interval_time.tick().await;
if let Err(e) = clean_shuffle_data_loop(&work_dir, cleanup_ttl).await {
error!("Ballista executor fail to clean_shuffle_data {:?}", e)
}
}
});
}

match scheduler_policy {
TaskSchedulingPolicy::PushStaged => {
tokio::spawn(executor_server::startup(
Expand Down Expand Up @@ -148,3 +167,111 @@ async fn main() -> Result<()> {

Ok(())
}

/// This function will scheduled periodically for cleanup executor.
/// Will only clean the dir under work_dir not include file
async fn clean_shuffle_data_loop(work_dir: &str, seconds: i64) -> Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
let mut need_delete_dir;
while let Some(child) = dir.next_entry().await? {
if let Ok(metadata) = child.metadata().await {
// only delete the job dir
if metadata.is_dir() {
let dir = fs::read_dir(child.path()).await?;
match check_modified_time_in_dirs(vec![dir], seconds).await {
Ok(x) => match x {
true => {
need_delete_dir = child.path().into_os_string();
to_deleted.push(need_delete_dir)
}
false => {}
},
Err(e) => {
error!("Fail in clean_shuffle_data_loop {:?}", e)
}
}
}
} else {
error!("can not get meta from file{:?}", child)
}
}
info!(
"Executor work_dir {:?} not modified in {:?} seconds will be deleted ",
&to_deleted, seconds
);
for del in to_deleted {
fs::remove_dir_all(del).await?;
}
Ok(())
}

/// Determines if a directory all files are older than cutoff seconds.
async fn check_modified_time_in_dirs(
mut vec: Vec<ReadDir>,
ttl_seconds: i64,
) -> Result<bool> {
let cutoff = Utc::now() - Duration::seconds(ttl_seconds);

while !vec.is_empty() {
let mut dir = vec.pop().unwrap();
while let Some(child) = dir.next_entry().await? {
let meta = child.metadata().await?;
match meta.is_dir() {
true => {
let dir = fs::read_dir(child.path()).await?;
// check in next loop
vec.push(dir);
}
false => {
let modified_time: DateTime<Utc> =
meta.modified().map(chrono::DateTime::from)?;
if modified_time > cutoff {
// if one file has been modified in ttl we won't delete the whole dir
return Ok(false);
}
}
}
}
}
Ok(true)
}

#[cfg(test)]
mod tests {
use crate::clean_shuffle_data_loop;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use tempfile::TempDir;

#[tokio::test]
async fn test_executor_clean_up() {
let work_dir = TempDir::new().unwrap().into_path();
let job_dir = work_dir.as_path().join("job_id");
let file_path = job_dir.as_path().join("tmp.csv");
let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Andrew,2018-11-13T17:11:10.011Z";
fs::create_dir(job_dir).unwrap();
File::create(&file_path)
.expect("creating temp file")
.write_all(data.as_bytes())
.expect("writing data");

let work_dir_clone = work_dir.clone();

let count1 = fs::read_dir(work_dir.clone()).unwrap().count();
assert_eq!(count1, 1);
let mut handles = vec![];
handles.push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
clean_shuffle_data_loop(work_dir_clone.to_str().unwrap(), 1)
.await
.unwrap();
}));
futures::future::join_all(handles).await;
let count2 = fs::read_dir(work_dir.clone()).unwrap().count();
assert_eq!(count2, 0);
}
}