diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 22c8e69bc902..769b3ac36733 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -814,9 +814,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.80" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51f1226cd9da55587234753d1245dd5b132343ea240f26b6a9003d68706141ba" +checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" dependencies = [ "jobserver", "libc", @@ -1134,9 +1134,11 @@ dependencies = [ name = "datafusion-execution" version = "28.0.0" dependencies = [ + "arrow", "dashmap", "datafusion-common", "datafusion-expr", + "futures", "hashbrown 0.14.0", "log", "object_store", @@ -1156,7 +1158,7 @@ dependencies = [ "lazy_static", "sqlparser", "strum 0.25.0", - "strum_macros 0.25.1", + "strum_macros 0.25.2", ] [[package]] @@ -1222,9 +1224,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8810e7e2cf385b1e9b50d68264908ec367ba642c96d02edfe61c39e88e2a3c01" +checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929" [[package]] name = "difflib" @@ -1735,7 +1737,7 @@ dependencies = [ "futures-util", "http", "hyper", - "rustls 0.21.5", + "rustls 0.21.6", "tokio", "tokio-rustls 0.24.1", ] @@ -2365,18 +2367,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", @@ -2385,9 +2387,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "2c516611246607d0c04186886dbb3a754368ef82c79e9827a802c6d836dd111c" [[package]] name = "pin-utils" @@ -2567,9 +2569,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ "aho-corasick", "memchr", @@ -2579,9 +2581,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ "aho-corasick", "memchr", @@ -2617,7 +2619,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.5", + "rustls 0.21.6", "rustls-pemfile", "serde", "serde_json", @@ -2693,9 +2695,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.4" +version = "0.38.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" +checksum = "172891ebdceb05aa0005f533a6cbfca599ddd7d966f6f5d4d9b2e70478e70399" dependencies = [ "bitflags 2.3.3", "errno", @@ -2718,9 +2720,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.5" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" dependencies = [ "log", "ring", @@ -2751,9 +2753,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.2" +version = "0.101.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" +checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" dependencies = [ "ring", "untrusted", @@ -2865,18 +2867,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed" +checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036" +checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", @@ -3033,7 +3035,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros 0.25.1", + "strum_macros 0.25.2", ] [[package]] @@ -3051,9 +3053,9 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.25.1" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232" +checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" dependencies = [ "heck", "proc-macro2", @@ -3092,9 +3094,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.7.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" +checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ "cfg-if", "fastrand 2.0.0", @@ -3157,9 +3159,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b79eabcd964882a646b3584543ccabeae7869e9ac32a46f6f22b7a5bd405308b" +checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea" dependencies = [ "deranged", "serde", @@ -3253,7 +3255,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.5", + "rustls 0.21.6", "tokio", ] diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 66254ee6f5f8..cc5e30ecdbd4 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -35,59 +35,19 @@ pub use datafusion_expr::Accumulator; pub use datafusion_expr::ColumnarValue; use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties; pub use display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; -use futures::stream::{Stream, TryStreamExt}; +use futures::stream::TryStreamExt; use std::fmt; use std::fmt::Debug; use tokio::task::JoinSet; use datafusion_common::tree_node::Transformed; use datafusion_common::DataFusionError; +use std::any::Any; use std::sync::Arc; -use std::task::{Context, Poll}; -use std::{any::Any, pin::Pin}; -/// Trait for types that stream [arrow::record_batch::RecordBatch] -pub trait RecordBatchStream: Stream> { - /// Returns the schema of this `RecordBatchStream`. - /// - /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this - /// stream should have the same schema as returned from this method. - fn schema(&self) -> SchemaRef; -} - -/// Trait for a [`Stream`](futures::stream::Stream) of [`RecordBatch`]es -pub type SendableRecordBatchStream = Pin>; - -/// EmptyRecordBatchStream can be used to create a RecordBatchStream -/// that will produce no results -pub struct EmptyRecordBatchStream { - /// Schema wrapped by Arc - schema: SchemaRef, -} - -impl EmptyRecordBatchStream { - /// Create an empty RecordBatchStream - pub fn new(schema: SchemaRef) -> Self { - Self { schema } - } -} - -impl RecordBatchStream for EmptyRecordBatchStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -impl Stream for EmptyRecordBatchStream { - type Item = Result; - - fn poll_next( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(None) - } -} +// backwards compatibility +pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; +pub use stream::EmptyRecordBatchStream; /// `ExecutionPlan` represent nodes in the DataFusion Physical Plan. /// diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs index bdc2050b2464..2b916b7ee263 100644 --- a/datafusion/core/src/physical_plan/stream.rs +++ b/datafusion/core/src/physical_plan/stream.rs @@ -17,7 +17,10 @@ //! Stream wrappers for physical operators +use std::pin::Pin; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use crate::physical_plan::displayable; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; @@ -231,9 +234,9 @@ impl Stream for RecordBatchReceiverStream { type Item = Result; fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.inner.poll_next_unpin(cx) } } @@ -276,10 +279,7 @@ where { type Item = Result; - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().stream.poll_next(cx) } @@ -297,6 +297,37 @@ where } } +/// EmptyRecordBatchStream can be used to create a RecordBatchStream +/// that will produce no results +pub struct EmptyRecordBatchStream { + /// Schema wrapped by Arc + schema: SchemaRef, +} + +impl EmptyRecordBatchStream { + /// Create an empty RecordBatchStream + pub fn new(schema: SchemaRef) -> Self { + Self { schema } + } +} + +impl RecordBatchStream for EmptyRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for EmptyRecordBatchStream { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(None) + } +} + /// Stream wrapper that records `BaselineMetrics` for a particular /// `[SendableRecordBatchStream]` (likely a partition) pub(crate) struct ObservedStream { @@ -326,9 +357,9 @@ impl futures::Stream for ObservedStream { type Item = Result; fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { let poll = self.inner.poll_next_unpin(cx); self.baseline_metrics.record_poll(poll) } diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index a9c8d0027b2e..b87e73cdf2cc 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -33,9 +33,11 @@ name = "datafusion_execution" path = "src/lib.rs" [dependencies] +arrow = { workspace = true } dashmap = "5.4.0" datafusion-common = { path = "../common", version = "28.0.0" } datafusion-expr = { path = "../expr", version = "28.0.0" } +futures = "0.3" hashbrown = { version = "0.14", features = ["raw"] } log = "^0.4" object_store = "0.6.1" diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 46ffe1294256..57d77aa1dde0 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -23,8 +23,10 @@ pub mod memory_pool; pub mod object_store; pub mod registry; pub mod runtime_env; +mod stream; mod task; pub use disk_manager::DiskManager; pub use registry::FunctionRegistry; +pub use stream::{RecordBatchStream, SendableRecordBatchStream}; pub use task::TaskContext; diff --git a/datafusion/execution/src/stream.rs b/datafusion/execution/src/stream.rs new file mode 100644 index 000000000000..5a1a9aaa2590 --- /dev/null +++ b/datafusion/execution/src/stream.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::Result; +use futures::Stream; +use std::pin::Pin; + +/// Trait for types that stream [arrow::record_batch::RecordBatch] +pub trait RecordBatchStream: Stream> { + /// Returns the schema of this `RecordBatchStream`. + /// + /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this + /// stream should have the same schema as returned from this method. + fn schema(&self) -> SchemaRef; +} + +/// Trait for a [`Stream`](futures::stream::Stream) of [`RecordBatch`]es +pub type SendableRecordBatchStream = Pin>;