Skip to content

Commit

Permalink
feat: Allow empty agg hash table (distinct), initial CSV data source (#…
Browse files Browse the repository at this point in the history
…117)

* allow zero aggregates in agg hash table, use for distinct (ALL)

* hash nulls

* nested queries, uncomment some tests

* use single user engine in shell

* provide queries as arg

* int64 timestamps from parquet

* bump version

* additional parquet tests

* file source trait

* read stream for http

* csv decoder

* begin wiring up

* tests

* fix bugs

* Add test, implement stream for wasm

* bump version

* lint

* comments
  • Loading branch information
scsmithr authored Jul 11, 2024
1 parent 9c0e02f commit e9cb5b4
Show file tree
Hide file tree
Showing 48 changed files with 2,274 additions and 269 deletions.
81 changes: 58 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ default-members = ["crates/*", "test_bin"]
resolver = "2"

[workspace.package]
version = "0.0.12"
version = "0.0.14"
edition = "2021"

[profile.release]
Expand All @@ -29,3 +29,5 @@ tokio = { version = "1.38.0", default-features = false, features = ["rt", "time"
tracing = { version = "0.1.40", default-features = false }
regex = "1.10.5"
url = "2.5.1"
serde = "1.0.204"
bytes = "1.6.0"
3 changes: 0 additions & 3 deletions crates/parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
use std::error::Error;
use std::{cell, io, result, str};

#[cfg(feature = "arrow")]
use arrow_schema::ArrowError;

/// Parquet error enumeration
// Note: we don't implement PartialEq as the semantics for the
// external variant are not well defined (#4469)
Expand Down
2 changes: 2 additions & 0 deletions crates/rayexec_bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ rayexec_rt_native = { path = '../rayexec_rt_native' }
rayexec_bullet = { path = '../rayexec_bullet' }
rayexec_postgres = { path = '../rayexec_postgres' }
rayexec_parquet = { path = '../rayexec_parquet', features = ["zstd"] }
rayexec_csv = { path = '../rayexec_csv' }
tracing = { workspace = true }
tracing-subscriber = {version = "0.3", features = ["std", "fmt", "json", "env-filter"] }
futures = { workspace = true }
crossterm = "0.27.0"
clap = { version = "4.5.9", features = ["derive"] }
50 changes: 41 additions & 9 deletions crates/rayexec_bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
use std::io::BufWriter;
use std::io::{BufWriter, Write};
use std::sync::Arc;

use clap::Parser;
use crossterm::event::{self, Event, KeyModifiers};
use rayexec_csv::CsvDataSource;
use rayexec_error::Result;
use rayexec_execution::datasource::{DataSourceRegistry, MemoryDataSource};
use rayexec_execution::engine::Engine;
use rayexec_execution::runtime::ExecutionRuntime;
use rayexec_parquet::ParquetDataSource;
use rayexec_postgres::PostgresDataSource;
use rayexec_rt_native::runtime::ThreadedExecutionRuntime;
use rayexec_shell::lineedit::KeyEvent;
use rayexec_shell::session::SingleUserEngine;
use rayexec_shell::shell::{Shell, ShellSignal};
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::FmtSubscriber;

#[derive(Parser)]
#[clap(name = "rayexec_bin")]
struct Arguments {
/// Queries to execute.
///
/// If omitted, an interactive session will be automatically started.
#[clap(trailing_var_arg = true)]
queries: Vec<String>,
}

/// Simple binary for quickly running arbitrary queries.
fn main() {
let args = Arguments::parse();

let env_filter = EnvFilter::builder()
.with_default_directive(tracing::Level::ERROR.into())
.from_env_lossy()
Expand Down Expand Up @@ -43,7 +57,7 @@ fn main() {
// Note we do an explicit clone here to avoid dropping the tokio runtime
// owned by the execution runtime inside the async context.
let runtime_clone = runtime.clone();
let result = tokio_handle.block_on(async move { inner(runtime_clone).await });
let result = tokio_handle.block_on(async move { inner(args, runtime_clone).await });

if let Err(e) = result {
println!("ERROR: {e}");
Expand All @@ -70,22 +84,40 @@ fn from_crossterm_keycode(code: crossterm::event::KeyCode) -> KeyEvent {
}
}

async fn inner(runtime: Arc<dyn ExecutionRuntime>) -> Result<()> {
async fn inner(args: Arguments, runtime: Arc<dyn ExecutionRuntime>) -> Result<()> {
let registry = DataSourceRegistry::default()
.with_datasource("memory", Box::new(MemoryDataSource))?
.with_datasource("postgres", Box::new(PostgresDataSource))?
.with_datasource("parquet", Box::new(ParquetDataSource))?;
let engine = Engine::new_with_registry(runtime, registry)?;
let session = engine.new_session()?;
.with_datasource("parquet", Box::new(ParquetDataSource))?
.with_datasource("csv", Box::new(CsvDataSource))?;
let engine = SingleUserEngine::new_with_runtime(runtime, registry)?;

let (cols, _rows) = crossterm::terminal::size()?;
let stdout = BufWriter::new(std::io::stdout());
let mut stdout = BufWriter::new(std::io::stdout());

if !args.queries.is_empty() {
// Queries provided directly, run and print them, and exit.

// TODO: Check if file, read file...

for query in args.queries {
let tables = engine.sql(&query).await?;
for table in tables {
writeln!(stdout, "{}", table.pretty_table(cols as usize, None)?)?;
}
stdout.flush()?;
}

return Ok(());
}

// Otherwise continue on with interactive shell.

crossterm::terminal::enable_raw_mode()?;

let shell = Shell::new(stdout);
shell.set_cols(cols as usize);
shell.attach(session, "Rayexec Shell")?;
shell.attach(engine, "Rayexec Shell")?;

let inner_loop = || async move {
loop {
Expand Down
6 changes: 6 additions & 0 deletions crates/rayexec_bullet/src/compute/cast/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub struct FromStrParser<T: FromStr> {
_type: PhantomData<T>,
}

impl<T: FromStr> FromStrParser<T> {
pub const fn new() -> Self {
FromStrParser { _type: PhantomData }
}
}

impl<T: FromStr> Parser for FromStrParser<T> {
type Type = T;
fn parse(&mut self, s: &str) -> Option<Self::Type> {
Expand Down
Loading

0 comments on commit e9cb5b4

Please sign in to comment.