From ca51ee0aee23bc40498df7219ac43e626d4214b5 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Sun, 17 Nov 2024 15:39:11 +1100 Subject: [PATCH 1/8] c --- .../polars-io/src/parquet/read/read_impl.rs | 81 ++++++------------- crates/polars-io/src/pl_async.rs | 28 +++++++ 2 files changed, 53 insertions(+), 56 deletions(-) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 07bb47bb88a7..83cd0013592f 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -28,6 +28,7 @@ use crate::hive::materialize_hive_partitions; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::metadata::FileMetadataRef; use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR; +use crate::pl_async::get_runtime; use crate::predicates::{apply_predicate, PhysicalIoExpr}; use crate::utils::get_reader_bytes; use crate::utils::slice::split_slice_at_file; @@ -1182,44 +1183,24 @@ impl BatchedParquetReader { .fetch_row_groups(row_group_range.clone()) .await?; - let mut dfs = match store { - ColumnStore::Local(_) => rg_to_dfs( - &store, - &mut self.rows_read, - row_group_range.start, - row_group_range.end, - self.slice, - &self.metadata, - &self.schema, - self.predicate.as_deref(), - self.row_index.clone(), - self.parallel, - &self.projection, - self.use_statistics, - self.hive_partition_columns.as_deref(), - ), - #[cfg(feature = "async")] - ColumnStore::Fetched(b) => { - // This branch we spawn the decoding and decompression of the bytes on a rayon task. - // This will ensure we don't block the async thread. - - // Reconstruct as that makes it a 'static. - let store = ColumnStore::Fetched(b); - let (tx, rx) = tokio::sync::oneshot::channel(); - - // Make everything 'static. - let mut rows_read = self.rows_read; - let row_index = self.row_index.clone(); - let predicate = self.predicate.clone(); - let schema = self.schema.clone(); - let metadata = self.metadata.clone(); - let parallel = self.parallel; - let projection = self.projection.clone(); - let use_statistics = self.use_statistics; - let hive_partition_columns = self.hive_partition_columns.clone(); - let slice = self.slice; - - let f = move || { + let mut dfs = { + // Spawn the decoding and decompression of the bytes on a rayon task. + // This will ensure we don't block the async thread. + + // Make everything 'static. + let mut rows_read = self.rows_read; + let row_index = self.row_index.clone(); + let predicate = self.predicate.clone(); + let schema = self.schema.clone(); + let metadata = self.metadata.clone(); + let parallel = self.parallel; + let projection = self.projection.clone(); + let use_statistics = self.use_statistics; + let hive_partition_columns = self.hive_partition_columns.clone(); + let slice = self.slice; + + let (dfs, rows_read) = get_runtime() + .spawn_rayon(move || { let dfs = rg_to_dfs( &store, &mut rows_read, @@ -1236,25 +1217,13 @@ impl BatchedParquetReader { hive_partition_columns.as_deref(), ); - // Don't unwrap send attempt - async task could be cancelled. - let _ = tx.send((dfs, rows_read)); - }; - - // Spawn the task and wait on it asynchronously. - if POOL.current_thread_index().is_some() { - // We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until - // another rayon thread executes it - we would deadlock if all rayon threads did this. - // Safety: The tokio runtime flavor is multi-threaded. - tokio::task::block_in_place(f); - } else { - POOL.spawn(f); - }; + dfs.map(|x| (x, rows_read)) + }) + .await?; - let (dfs, rows_read) = rx.await.unwrap(); - self.rows_read = rows_read; - dfs - }, - }?; + self.rows_read = rows_read; + dfs + }; if let Some(ca) = self.include_file_path.as_mut() { let mut max_len = 0; diff --git a/crates/polars-io/src/pl_async.rs b/crates/polars-io/src/pl_async.rs index 4c95c96f7733..293d10cb3eea 100644 --- a/crates/polars-io/src/pl_async.rs +++ b/crates/polars-io/src/pl_async.rs @@ -317,6 +317,34 @@ impl RuntimeManager { { self.rt.spawn_blocking(f) } + + /// Run a task on the rayon threadpool. To avoid deadlocks, if the current thread is already a + /// rayon thread, the task is executed on the current thread after tokio's `block_in_place` is + /// used to spawn another thread to poll futures. + pub async fn spawn_rayon(&self, func: F) -> O + where + F: FnOnce() -> O + Send + Sync + 'static, + O: Send + Sync + 'static, + { + if POOL.current_thread_index().is_some() { + // We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until + // another rayon thread executes it - we would deadlock if all rayon threads did this. + // Safety: The tokio runtime flavor is multi-threaded. + tokio::task::block_in_place(func) + } else { + let (tx, rx) = tokio::sync::oneshot::channel(); + + let func = move || { + let out = func(); + // Don't unwrap send attempt - async task could be cancelled. + let _ = tx.send(out); + }; + + POOL.spawn(func); + + rx.await.unwrap() + } + } } static RUNTIME: Lazy = Lazy::new(RuntimeManager::new); From 7c71c301f29fc3a2ef54d6f4a36a2a36b5c049da Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Sun, 17 Nov 2024 15:41:20 +1100 Subject: [PATCH 2/8] c --- crates/polars-io/src/pl_async.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/polars-io/src/pl_async.rs b/crates/polars-io/src/pl_async.rs index 293d10cb3eea..4a71410fc709 100644 --- a/crates/polars-io/src/pl_async.rs +++ b/crates/polars-io/src/pl_async.rs @@ -327,6 +327,9 @@ impl RuntimeManager { O: Send + Sync + 'static, { if POOL.current_thread_index().is_some() { + if cfg!(debug_assertions) { + panic!("rayon was entered from an async context"); + } // We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until // another rayon thread executes it - we would deadlock if all rayon threads did this. // Safety: The tokio runtime flavor is multi-threaded. From e9a469327eb2c737483e7029f84a9f702c884890 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Sun, 17 Nov 2024 16:01:10 +1100 Subject: [PATCH 3/8] c --- crates/polars-io/src/pl_async.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/polars-io/src/pl_async.rs b/crates/polars-io/src/pl_async.rs index 4a71410fc709..293d10cb3eea 100644 --- a/crates/polars-io/src/pl_async.rs +++ b/crates/polars-io/src/pl_async.rs @@ -327,9 +327,6 @@ impl RuntimeManager { O: Send + Sync + 'static, { if POOL.current_thread_index().is_some() { - if cfg!(debug_assertions) { - panic!("rayon was entered from an async context"); - } // We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until // another rayon thread executes it - we would deadlock if all rayon threads did this. // Safety: The tokio runtime flavor is multi-threaded. From ad2a5030343ee4acd7474895747db6a6e4533b22 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Sun, 17 Nov 2024 16:09:15 +1100 Subject: [PATCH 4/8] c --- .../polars-io/src/parquet/read/read_impl.rs | 54 +++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 83cd0013592f..ac095a4a23b7 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -28,7 +28,6 @@ use crate::hive::materialize_hive_partitions; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::metadata::FileMetadataRef; use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR; -use crate::pl_async::get_runtime; use crate::predicates::{apply_predicate, PhysicalIoExpr}; use crate::utils::get_reader_bytes; use crate::utils::slice::split_slice_at_file; @@ -1199,27 +1198,38 @@ impl BatchedParquetReader { let hive_partition_columns = self.hive_partition_columns.clone(); let slice = self.slice; - let (dfs, rows_read) = get_runtime() - .spawn_rayon(move || { - let dfs = rg_to_dfs( - &store, - &mut rows_read, - row_group_range.start, - row_group_range.end, - slice, - &metadata, - &schema, - predicate.as_deref(), - row_index, - parallel, - &projection, - use_statistics, - hive_partition_columns.as_deref(), - ); - - dfs.map(|x| (x, rows_read)) - }) - .await?; + let func = move || { + let dfs = rg_to_dfs( + &store, + &mut rows_read, + row_group_range.start, + row_group_range.end, + slice, + &metadata, + &schema, + predicate.as_deref(), + row_index, + parallel, + &projection, + use_statistics, + hive_partition_columns.as_deref(), + ); + + dfs.map(|x| (x, rows_read)) + }; + + let (dfs, rows_read) = { + #[cfg(feature = "async")] + { + crate::pl_async::get_runtime().spawn_rayon(func).await? + } + #[cfg(not(feature = "async"))] + { + // Just call the function. Not much we can do, since async isn't a required + // feature for the BatchedParquetReader. + func() + } + }; self.rows_read = rows_read; dfs From 5f7ae838576ec847c418b051cf656a12446ce272 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Sun, 17 Nov 2024 16:12:41 +1100 Subject: [PATCH 5/8] c --- crates/polars-io/src/parquet/read/read_impl.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index ac095a4a23b7..2fc77f099ec3 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -1221,15 +1221,15 @@ impl BatchedParquetReader { let (dfs, rows_read) = { #[cfg(feature = "async")] { - crate::pl_async::get_runtime().spawn_rayon(func).await? + crate::pl_async::get_runtime().spawn_rayon(func).await } #[cfg(not(feature = "async"))] { - // Just call the function. Not much we can do, since async isn't a required - // feature for the BatchedParquetReader. + // Just call the function - we don't have access to our native async runtime + // if this async is configured out. func() } - }; + }?; self.rows_read = rows_read; dfs From d99ca39a6da730cb0da6c1d4244d38c56be734af Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Sun, 17 Nov 2024 16:12:54 +1100 Subject: [PATCH 6/8] c --- crates/polars-io/src/parquet/read/read_impl.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 2fc77f099ec3..1ec8cea96560 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -1226,7 +1226,7 @@ impl BatchedParquetReader { #[cfg(not(feature = "async"))] { // Just call the function - we don't have access to our native async runtime - // if this async is configured out. + // if async is configured out. func() } }?; From 21268512931f336499c8761ddecfea4c5a206373 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Sun, 17 Nov 2024 16:13:11 +1100 Subject: [PATCH 7/8] c --- crates/polars-io/src/parquet/read/read_impl.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 1ec8cea96560..0e6cb20d5033 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -1225,8 +1225,8 @@ impl BatchedParquetReader { } #[cfg(not(feature = "async"))] { - // Just call the function - we don't have access to our native async runtime - // if async is configured out. + // Just call the function - we don't have access to pl_async if `async` is + // configured out. func() } }?; From a4e367fcdccd69192c4004f5bd9076afb0a74f67 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Sun, 17 Nov 2024 16:20:08 +1100 Subject: [PATCH 8/8] c --- crates/polars-io/src/parquet/read/read_impl.rs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 0e6cb20d5033..884a9f8f5771 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -1152,6 +1152,7 @@ impl BatchedParquetReader { self.row_group_offset + n > self.n_row_groups } + #[cfg(feature = "async")] pub async fn next_batches(&mut self, n: usize) -> PolarsResult>> { if self.rows_read as usize == self.slice.0 + self.slice.1 && self.has_returned { return if self.chunks_fifo.is_empty() { @@ -1218,18 +1219,7 @@ impl BatchedParquetReader { dfs.map(|x| (x, rows_read)) }; - let (dfs, rows_read) = { - #[cfg(feature = "async")] - { - crate::pl_async::get_runtime().spawn_rayon(func).await - } - #[cfg(not(feature = "async"))] - { - // Just call the function - we don't have access to pl_async if `async` is - // configured out. - func() - } - }?; + let (dfs, rows_read) = crate::pl_async::get_runtime().spawn_rayon(func).await?; self.rows_read = rows_read; dfs