diff --git a/ballista/rust/client/README.md b/ballista/rust/client/README.md index f5fe094d9..1c6a15ce0 100644 --- a/ballista/rust/client/README.md +++ b/ballista/rust/client/README.md @@ -84,50 +84,72 @@ To build a simple ballista example, add the following dependencies to your `Carg ```toml [dependencies] -ballista = "0.6" -datafusion = "7.0" +ballista = "0.8" +datafusion = "12.0.0" tokio = "1.0" ``` -The following example runs a simple aggregate SQL query against a CSV file from the +The following example runs a simple aggregate SQL query against a Parquet file (`yellow_tripdata_2022-01.parquet`) from the [New York Taxi and Limousine Commission](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) -data set. +data set. Download the file and add it to the `testdata` folder before running the example. ```rust,no_run use ballista::prelude::*; +use datafusion::prelude::{col, min, max, avg, sum, ParquetReadOptions}; use datafusion::arrow::util::pretty; use datafusion::prelude::CsvReadOptions; #[tokio::main] async fn main() -> Result<()> { - // create configuration - let config = BallistaConfig::builder() - .set("ballista.shuffle.partitions", "4") - .build()?; - - // connect to Ballista scheduler - let ctx = BallistaContext::remote("localhost", 50050, &config).await?; - - // register csv file with the execution context - ctx.register_csv( - "tripdata", - "/path/to/yellow_tripdata_2020-01.csv", - CsvReadOptions::new(), - ).await?; - - // execute the query - let df = ctx.sql( - "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount) - FROM tripdata - GROUP BY passenger_count - ORDER BY passenger_count", - ).await?; - - // collect the results and print them to stdout - let results = df.collect().await?; - pretty::print_batches(&results)?; - Ok(()) + // create configuration + let config = BallistaConfig::builder() + .set("ballista.shuffle.partitions", "4") + .build()?; + + // connect to Ballista scheduler + let ctx = BallistaContext::remote("localhost", 50050, &config).await?; + + let filename = "testdata/yellow_tripdata_2022-01.parquet"; + + // define the query using the DataFrame trait + let df = ctx + .read_parquet(filename, ParquetReadOptions::default()) + .await? + .select_columns(&["passenger_count", "fare_amount"])? + .aggregate(vec![col("passenger_count")], vec![min(col("fare_amount")), max(col("fare_amount")), avg(col("fare_amount")), sum(col("fare_amount"))])? + .sort(vec![col("passenger_count").sort(true,true)])?; + + // this is equivalent to the following SQL + // SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount) + // FROM tripdata + // GROUP BY passenger_count + // ORDER BY passenger_count + + // print the results + df.show().await?; + + Ok(()) } ``` +The output should look similar to the following table. + +```{r eval=FALSE} ++-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ +| passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) | ++-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ +| | -159.5 | 285.2 | 17.60577640099004 | 1258865.829999991 | +| 0 | -115 | 500 | 11.794859107585335 | 614052.1600000001 | +| 1 | -480 | 401092.32 | 12.61028389876563 | 22623542.879999973 | +| 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 | +| 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 | +| 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 | +| 5 | -52 | 668 | 12.160378472086954 | 624289.51 | +| 6 | -52 | 252.5 | 12.576583325529857 | 402916 | +| 7 | 7 | 79 | 61.77777777777778 | 556 | +| 8 | 8.3 | 115 | 79.9125 | 639.3 | +| 9 | 9.3 | 96.5 | 65.26666666666667 | 195.8 | ++-----------------+--------------------------+--------------------------+--------------------------+--------------------------+ +``` + More [examples](https://github.com/apache/arrow-datafusion/tree/master/ballista-examples) can be found in the arrow-datafusion repository. diff --git a/examples/README.md b/examples/README.md index e80bbd290..3e6a00ce8 100644 --- a/examples/README.md +++ b/examples/README.md @@ -96,6 +96,11 @@ cargo run --release --bin sql ### Source code for distributed SQL example ```rust +use ballista::prelude::*; +use datafusion::prelude::CsvReadOptions; + +/// This example demonstrates executing a simple query against an Arrow data source (CSV) and +/// fetching results, using SQL #[tokio::main] async fn main() -> Result<()> { let config = BallistaConfig::builder() @@ -103,14 +108,23 @@ async fn main() -> Result<()> { .build()?; let ctx = BallistaContext::remote("localhost", 50050, &config).await?; - let filename = "testdata/alltypes_plain.parquet"; + // register csv file with the execution context + ctx.register_csv( + "test", + "testdata/aggregate_test_100.csv", + CsvReadOptions::new(), + ) + .await?; - // define the query using the DataFrame trait + // execute the query let df = ctx - .read_parquet(filename, ParquetReadOptions::default()) - .await? - .select_columns(&["id", "bool_col", "timestamp_col"])? - .filter(col("id").gt(lit(1)))?; + .sql( + "SELECT c1, MIN(c12), MAX(c12) \ + FROM test \ + WHERE c11 > 0.1 AND c11 < 0.9 \ + GROUP BY c1", + ) + .await?; // print the results df.show().await?;