Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler now verifies that file:// ListingTable URLs are accessible #414

Merged
merged 7 commits into from
Oct 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
scheduler now verifies that ListingTable URLs are accessible
  • Loading branch information
andygrove committed Oct 20, 2022
commit 708ac9c319c4d98abecf80f2e2507bee4a95d03a
1 change: 1 addition & 0 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ pub fn remove_unresolved_shuffles(
.iter()
.map(|c| c
.iter()
.filter(|l| !l.path.is_empty())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated change but this avoids printing out lots of newlines in the log

.map(|l| l.path.clone())
.collect::<Vec<_>>()
.join(", "))
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion_proto::logical_plan::AsLogicalPlan;
use futures::TryStreamExt;
use log::{debug, error, info, warn};
use log::{debug, error, info, trace, warn};

use std::ops::Deref;
use std::sync::Arc;
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
task_status,
} = request.into_inner()
{
debug!("Received poll_work request for {:?}", metadata);
trace!("Received poll_work request for {:?}", metadata);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was too verbose to be debug

// We might receive buggy poll work requests from dead executors.
if self
.state
Expand Down
56 changes: 55 additions & 1 deletion ballista/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
use datafusion::datasource::source_as_provider;
use datafusion::logical_expr::PlanVisitor;
use std::any::type_name;
use std::future::Future;
use std::sync::Arc;
Expand All @@ -31,6 +34,7 @@ use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{debug, error, info};
Expand Down Expand Up @@ -256,11 +260,61 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
plan: &LogicalPlan,
) -> Result<()> {
let start = Instant::now();

// optimizing the plan here is redundant because the physical planner will do this again
// but it is helpful to see what the optimized plan will be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. How about changing this to the following:

if log::max_level() >= log::Level::Debug { let optimized_plan = session_ctx.optimize(plan)?; debug!("Optimized plan: {}", optimized_plan.display_indent()); }
so that we can avoid unnecessary optimization when the log level in set to be info.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Maybe we just need to check the first path.

let optimized_plan = session_ctx.optimize(plan)?;
debug!("Optimized plan: {}", optimized_plan.display_indent());

struct VerifyPathsExist {}
impl PlanVisitor for VerifyPathsExist {
type Error = BallistaError;

fn pre_visit(
&mut self,
plan: &LogicalPlan,
) -> std::result::Result<bool, Self::Error> {
match plan {
LogicalPlan::TableScan(scan) => {
let provider = source_as_provider(&scan.source)?;
if let Some(table) =
provider.as_any().downcast_ref::<ListingTable>()
{
debug!(
"ListingTable with {} urls",
table.table_paths().len()
);
for url in table.table_paths() {
// remove file:// prefix and verify that the file is accessible
let url = url.as_str();
let url = if url.starts_with("file://") {
&url[7..]
} else {
url
};
ListingTableUrl::parse(url)
.map_err(|e| BallistaError::General(
format!("logical plan refers to path that is not accessible in scheduler file system: {}: {:?}", url, e)))?;
}
Ok(true)
} else {
debug!("TableProvider is not a ListingTable");
Ok(true)
}
}
_ => Ok(true),
}
}
}

debug!("Calculated optimized plan: {:?}", optimized_plan);
let mut verify_paths_exist = VerifyPathsExist {};
optimized_plan.accept(&mut verify_paths_exist)?;

let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
debug!(
"Physical plan: {}",
DisplayableExecutionPlan::new(plan.as_ref()).indent()
);

self.task_manager
.submit_job(job_id, job_name, &session_ctx.session_id(), plan)
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ services:
- "80:80"
- "50050:50050"
environment:
- RUST_LOG=info
- RUST_LOG=ballista=debug,info
volumes:
- ./benchmarks/data:/data
depends_on:
Expand Down