@@ -70,6 +70,8 @@ impl BallistaContextState {
70
70
concurrent_tasks : usize ,
71
71
) -> ballista_core:: error:: Result < Self > {
72
72
use ballista_core:: serde:: protobuf:: scheduler_grpc_client:: SchedulerGrpcClient ;
73
+ use ballista_core:: serde:: protobuf:: PhysicalPlanNode ;
74
+ use ballista_core:: serde:: BallistaCodec ;
73
75
74
76
log:: info!( "Running in local mode. Scheduler will be run in-proc" ) ;
75
77
@@ -90,7 +92,16 @@ impl BallistaContextState {
90
92
}
91
93
} ;
92
94
93
- ballista_executor:: new_standalone_executor ( scheduler, concurrent_tasks) . await ?;
95
+ let default_codec: BallistaCodec < LogicalPlanNode , PhysicalPlanNode > =
96
+ BallistaCodec :: default ( ) ;
97
+
98
+ ballista_executor:: new_standalone_executor (
99
+ scheduler,
100
+ concurrent_tasks,
101
+ default_codec,
102
+ )
103
+ . await ?;
104
+
94
105
Ok ( Self {
95
106
config : config. clone ( ) ,
96
107
scheduler_host : "localhost" . to_string ( ) ,
@@ -458,13 +469,17 @@ mod tests {
458
469
459
470
#[ tokio:: test]
460
471
#[ cfg( feature = "standalone" ) ]
472
+ #[ ignore]
473
+ // Tracking: https://github.com/apache/arrow-datafusion/issues/1840
461
474
async fn test_task_stuck_when_referenced_task_failed ( ) {
462
475
use super :: * ;
463
476
use datafusion:: arrow:: datatypes:: Schema ;
464
477
use datafusion:: arrow:: util:: pretty;
465
478
use datafusion:: datasource:: file_format:: csv:: CsvFormat ;
466
479
use datafusion:: datasource:: file_format:: parquet:: ParquetFormat ;
467
- use datafusion:: datasource:: listing:: { ListingOptions , ListingTable } ;
480
+ use datafusion:: datasource:: listing:: {
481
+ ListingOptions , ListingTable , ListingTableConfig ,
482
+ } ;
468
483
469
484
use ballista_core:: config:: {
470
485
BallistaConfigBuilder , BALLISTA_WITH_INFORMATION_SCHEMA ,
@@ -502,12 +517,16 @@ mod tests {
502
517
collect_stat : x. collect_stat ,
503
518
target_partitions : x. target_partitions ,
504
519
} ;
505
- let error_table = ListingTable :: new (
520
+
521
+ let config = ListingTableConfig :: new (
506
522
listing_table. object_store ( ) . clone ( ) ,
507
523
listing_table. table_path ( ) . to_string ( ) ,
508
- Arc :: new ( Schema :: new ( vec ! [ ] ) ) ,
509
- error_options,
510
- ) ;
524
+ )
525
+ . with_schema ( Arc :: new ( Schema :: new ( vec ! [ ] ) ) )
526
+ . with_listing_options ( error_options) ;
527
+
528
+ let error_table = ListingTable :: try_new ( config) . unwrap ( ) ;
529
+
511
530
// change the table to an error table
512
531
guard
513
532
. tables
0 commit comments