diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index 3c835d063f77..90d283b337e5 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -1,450 +1,21 @@ use std::collections::HashMap; -use std::num::NonZeroUsize; -use std::path::PathBuf; -use polars::io::{HiveOptions, RowIndex}; use polars::time::*; use polars_core::prelude::*; -#[cfg(feature = "parquet")] -use polars_parquet::arrow::write::StatisticsOptions; -use polars_plan::plans::ScanSources; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; -use pyo3::types::{PyDict, PyList}; +use pyo3::types::PyDict; use super::PyLazyFrame; use crate::error::PyPolarsErr; use crate::expr::ToExprs; -use crate::interop::arrow::to_rust::pyarrow_schema_to_rust; use crate::lazyframe::visit::NodeTraverser; use crate::prelude::*; use crate::{PyDataFrame, PyExpr, PyLazyGroupBy}; -fn pyobject_to_first_path_and_scan_sources( - obj: PyObject, -) -> PyResult<(Option, ScanSources)> { - use crate::file::{get_python_scan_source_input, PythonScanSourceInput}; - Ok(match get_python_scan_source_input(obj, false)? { - PythonScanSourceInput::Path(path) => { - (Some(path.clone()), ScanSources::Paths([path].into())) - }, - PythonScanSourceInput::File(file) => (None, ScanSources::Files([file].into())), - PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())), - }) -} - #[pymethods] #[allow(clippy::should_implement_trait)] impl PyLazyFrame { - #[staticmethod] - #[cfg(feature = "json")] - #[allow(clippy::too_many_arguments)] - #[pyo3(signature = ( - source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk, - row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl - ))] - fn new_from_ndjson( - source: Option, - sources: Wrap, - infer_schema_length: Option, - schema: Option>, - schema_overrides: Option>, - batch_size: Option, - n_rows: Option, - low_memory: bool, - rechunk: bool, - row_index: Option<(String, IdxSize)>, - ignore_errors: bool, - include_file_paths: Option, - cloud_options: Option>, - credential_provider: Option, - retries: usize, - file_cache_ttl: Option, - ) -> PyResult { - use cloud::credential_provider::PlCredentialProvider; - let row_index = row_index.map(|(name, offset)| RowIndex { - name: name.into(), - offset, - }); - - let sources = sources.0; - let (first_path, sources) = match source { - None => (sources.first_path().map(|p| p.to_path_buf()), sources), - Some(source) => pyobject_to_first_path_and_scan_sources(source)?, - }; - - let mut r = LazyJsonLineReader::new_with_sources(sources); - - #[cfg(feature = "cloud")] - if let Some(first_path) = first_path { - let first_path_url = first_path.to_string_lossy(); - - let mut cloud_options = - parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?; - cloud_options = cloud_options - .with_max_retries(retries) - .with_credential_provider( - credential_provider.map(PlCredentialProvider::from_python_func_object), - ); - - if let Some(file_cache_ttl) = file_cache_ttl { - cloud_options.file_cache_ttl = file_cache_ttl; - } - - r = r.with_cloud_options(Some(cloud_options)); - }; - - let lf = r - .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new)) - .with_batch_size(batch_size) - .with_n_rows(n_rows) - .low_memory(low_memory) - .with_rechunk(rechunk) - .with_schema(schema.map(|schema| Arc::new(schema.0))) - .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0))) - .with_row_index(row_index) - .with_ignore_errors(ignore_errors) - .with_include_file_paths(include_file_paths.map(|x| x.into())) - .finish() - .map_err(PyPolarsErr::from)?; - - Ok(lf.into()) - } - - #[staticmethod] - #[cfg(feature = "csv")] - #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, - low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string, - infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header, - encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema, - cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths - ) - )] - fn new_from_csv( - source: Option, - sources: Wrap, - separator: &str, - has_header: bool, - ignore_errors: bool, - skip_rows: usize, - n_rows: Option, - cache: bool, - overwrite_dtype: Option)>>, - low_memory: bool, - comment_prefix: Option<&str>, - quote_char: Option<&str>, - null_values: Option>, - missing_utf8_is_empty_string: bool, - infer_schema_length: Option, - with_schema_modify: Option, - rechunk: bool, - skip_rows_after_header: usize, - encoding: Wrap, - row_index: Option<(String, IdxSize)>, - try_parse_dates: bool, - eol_char: &str, - raise_if_empty: bool, - truncate_ragged_lines: bool, - decimal_comma: bool, - glob: bool, - schema: Option>, - cloud_options: Option>, - credential_provider: Option, - retries: usize, - file_cache_ttl: Option, - include_file_paths: Option, - ) -> PyResult { - use cloud::credential_provider::PlCredentialProvider; - - let null_values = null_values.map(|w| w.0); - let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied(); - let separator = separator - .as_bytes() - .first() - .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty")) - .copied() - .map_err(PyPolarsErr::from)?; - let eol_char = eol_char - .as_bytes() - .first() - .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty")) - .copied() - .map_err(PyPolarsErr::from)?; - let row_index = row_index.map(|(name, offset)| RowIndex { - name: name.into(), - offset, - }); - - let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| { - overwrite_dtype - .into_iter() - .map(|(name, dtype)| Field::new((&*name).into(), dtype.0)) - .collect::() - }); - - let sources = sources.0; - let (first_path, sources) = match source { - None => (sources.first_path().map(|p| p.to_path_buf()), sources), - Some(source) => pyobject_to_first_path_and_scan_sources(source)?, - }; - - let mut r = LazyCsvReader::new_with_sources(sources); - - #[cfg(feature = "cloud")] - if let Some(first_path) = first_path { - let first_path_url = first_path.to_string_lossy(); - - let mut cloud_options = - parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?; - if let Some(file_cache_ttl) = file_cache_ttl { - cloud_options.file_cache_ttl = file_cache_ttl; - } - cloud_options = cloud_options - .with_max_retries(retries) - .with_credential_provider( - credential_provider.map(PlCredentialProvider::from_python_func_object), - ); - r = r.with_cloud_options(Some(cloud_options)); - } - - let mut r = r - .with_infer_schema_length(infer_schema_length) - .with_separator(separator) - .with_has_header(has_header) - .with_ignore_errors(ignore_errors) - .with_skip_rows(skip_rows) - .with_n_rows(n_rows) - .with_cache(cache) - .with_dtype_overwrite(overwrite_dtype.map(Arc::new)) - .with_schema(schema.map(|schema| Arc::new(schema.0))) - .with_low_memory(low_memory) - .with_comment_prefix(comment_prefix.map(|x| x.into())) - .with_quote_char(quote_char) - .with_eol_char(eol_char) - .with_rechunk(rechunk) - .with_skip_rows_after_header(skip_rows_after_header) - .with_encoding(encoding.0) - .with_row_index(row_index) - .with_try_parse_dates(try_parse_dates) - .with_null_values(null_values) - .with_missing_is_null(!missing_utf8_is_empty_string) - .with_truncate_ragged_lines(truncate_ragged_lines) - .with_decimal_comma(decimal_comma) - .with_glob(glob) - .with_raise_if_empty(raise_if_empty) - .with_include_file_paths(include_file_paths.map(|x| x.into())); - - if let Some(lambda) = with_schema_modify { - let f = |schema: Schema| { - let iter = schema.iter_names().map(|s| s.as_str()); - Python::with_gil(|py| { - let names = PyList::new_bound(py, iter); - - let out = lambda.call1(py, (names,)).expect("python function failed"); - let new_names = out - .extract::>(py) - .expect("python function should return List[str]"); - polars_ensure!(new_names.len() == schema.len(), - ShapeMismatch: "The length of the new names list should be equal to or less than the original column length", - ); - Ok(schema - .iter_values() - .zip(new_names) - .map(|(dtype, name)| Field::new(name.into(), dtype.clone())) - .collect()) - }) - }; - r = r.with_schema_modify(f).map_err(PyPolarsErr::from)? - } - - Ok(r.finish().map_err(PyPolarsErr::from)?.into()) - } - - #[cfg(feature = "parquet")] - #[staticmethod] - #[pyo3(signature = ( - source, sources, n_rows, cache, parallel, rechunk, row_index, low_memory, cloud_options, - credential_provider, use_statistics, hive_partitioning, schema, hive_schema, - try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns, - ))] - fn new_from_parquet( - source: Option, - sources: Wrap, - n_rows: Option, - cache: bool, - parallel: Wrap, - rechunk: bool, - row_index: Option<(String, IdxSize)>, - low_memory: bool, - cloud_options: Option>, - credential_provider: Option, - use_statistics: bool, - hive_partitioning: Option, - schema: Option>, - hive_schema: Option>, - try_parse_hive_dates: bool, - retries: usize, - glob: bool, - include_file_paths: Option, - allow_missing_columns: bool, - ) -> PyResult { - use cloud::credential_provider::PlCredentialProvider; - - let parallel = parallel.0; - let hive_schema = hive_schema.map(|s| Arc::new(s.0)); - - let row_index = row_index.map(|(name, offset)| RowIndex { - name: name.into(), - offset, - }); - - let hive_options = HiveOptions { - enabled: hive_partitioning, - hive_start_idx: 0, - schema: hive_schema, - try_parse_dates: try_parse_hive_dates, - }; - - let mut args = ScanArgsParquet { - n_rows, - cache, - parallel, - rechunk, - row_index, - low_memory, - cloud_options: None, - use_statistics, - schema: schema.map(|x| Arc::new(x.0)), - hive_options, - glob, - include_file_paths: include_file_paths.map(|x| x.into()), - allow_missing_columns, - }; - - let sources = sources.0; - let (first_path, sources) = match source { - None => (sources.first_path().map(|p| p.to_path_buf()), sources), - Some(source) => pyobject_to_first_path_and_scan_sources(source)?, - }; - - #[cfg(feature = "cloud")] - if let Some(first_path) = first_path { - let first_path_url = first_path.to_string_lossy(); - let cloud_options = - parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?; - args.cloud_options = Some( - cloud_options - .with_max_retries(retries) - .with_credential_provider( - credential_provider.map(PlCredentialProvider::from_python_func_object), - ), - ); - } - - let lf = LazyFrame::scan_parquet_sources(sources, args).map_err(PyPolarsErr::from)?; - - Ok(lf.into()) - } - - #[cfg(feature = "ipc")] - #[staticmethod] - #[pyo3(signature = ( - source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider, - hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl, - include_file_paths - ))] - fn new_from_ipc( - source: Option, - sources: Wrap, - n_rows: Option, - cache: bool, - rechunk: bool, - row_index: Option<(String, IdxSize)>, - cloud_options: Option>, - credential_provider: Option, - hive_partitioning: Option, - hive_schema: Option>, - try_parse_hive_dates: bool, - retries: usize, - file_cache_ttl: Option, - include_file_paths: Option, - ) -> PyResult { - use cloud::credential_provider::PlCredentialProvider; - let row_index = row_index.map(|(name, offset)| RowIndex { - name: name.into(), - offset, - }); - - let hive_options = HiveOptions { - enabled: hive_partitioning, - hive_start_idx: 0, - schema: hive_schema.map(|x| Arc::new(x.0)), - try_parse_dates: try_parse_hive_dates, - }; - - let mut args = ScanArgsIpc { - n_rows, - cache, - rechunk, - row_index, - #[cfg(feature = "cloud")] - cloud_options: None, - hive_options, - include_file_paths: include_file_paths.map(|x| x.into()), - }; - - let sources = sources.0; - let (first_path, sources) = match source { - None => (sources.first_path().map(|p| p.to_path_buf()), sources), - Some(source) => pyobject_to_first_path_and_scan_sources(source)?, - }; - - #[cfg(feature = "cloud")] - if let Some(first_path) = first_path { - let first_path_url = first_path.to_string_lossy(); - - let mut cloud_options = - parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?; - if let Some(file_cache_ttl) = file_cache_ttl { - cloud_options.file_cache_ttl = file_cache_ttl; - } - args.cloud_options = Some( - cloud_options - .with_max_retries(retries) - .with_credential_provider( - credential_provider.map(PlCredentialProvider::from_python_func_object), - ), - ); - } - - let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?; - Ok(lf.into()) - } - - #[staticmethod] - fn scan_from_python_function_arrow_schema( - schema: &Bound<'_, PyList>, - scan_fn: PyObject, - pyarrow: bool, - ) -> PyResult { - let schema = pyarrow_schema_to_rust(schema)?; - Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into()) - } - - #[staticmethod] - fn scan_from_python_function_pl_schema( - schema: Vec<(PyBackedStr, Wrap)>, - scan_fn: PyObject, - pyarrow: bool, - ) -> PyResult { - let schema = Schema::from_iter( - schema - .into_iter() - .map(|(name, dt)| Field::new((&*name).into(), dt.0)), - ); - Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into()) - } - fn describe_plan(&self) -> PyResult { self.ldf .describe_plan() @@ -670,206 +241,6 @@ impl PyLazyFrame { }); } - #[cfg(all(feature = "streaming", feature = "parquet"))] - #[pyo3(signature = ( - path, compression, compression_level, statistics, row_group_size, data_page_size, - maintain_order, cloud_options, credential_provider, retries - ))] - fn sink_parquet( - &self, - py: Python, - path: PathBuf, - compression: &str, - compression_level: Option, - statistics: Wrap, - row_group_size: Option, - data_page_size: Option, - maintain_order: bool, - cloud_options: Option>, - credential_provider: Option, - retries: usize, - ) -> PyResult<()> { - let compression = parse_parquet_compression(compression, compression_level)?; - - let options = ParquetWriteOptions { - compression, - statistics: statistics.0, - row_group_size, - data_page_size, - maintain_order, - }; - - let cloud_options = { - let cloud_options = - parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?; - Some( - cloud_options - .with_max_retries(retries) - .with_credential_provider( - credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object), - ), - ) - }; - - // if we don't allow threads and we have udfs trying to acquire the gil from different - // threads we deadlock. - py.allow_threads(|| { - let ldf = self.ldf.clone(); - ldf.sink_parquet(&path, options, cloud_options) - .map_err(PyPolarsErr::from) - })?; - Ok(()) - } - - #[cfg(all(feature = "streaming", feature = "ipc"))] - #[pyo3(signature = (path, compression, maintain_order, cloud_options, credential_provider, retries))] - fn sink_ipc( - &self, - py: Python, - path: PathBuf, - compression: Option>, - maintain_order: bool, - cloud_options: Option>, - credential_provider: Option, - retries: usize, - ) -> PyResult<()> { - let options = IpcWriterOptions { - compression: compression.map(|c| c.0), - maintain_order, - }; - - let cloud_options = { - let cloud_options = - parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?; - Some( - cloud_options - .with_max_retries(retries) - .with_credential_provider( - credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object), - ), - ) - }; - - // if we don't allow threads and we have udfs trying to acquire the gil from different - // threads we deadlock. - py.allow_threads(|| { - let ldf = self.ldf.clone(); - ldf.sink_ipc(path, options, cloud_options) - .map_err(PyPolarsErr::from) - })?; - Ok(()) - } - - #[cfg(all(feature = "streaming", feature = "csv"))] - #[pyo3(signature = ( - path, include_bom, include_header, separator, line_terminator, quote_char, batch_size, - datetime_format, date_format, time_format, float_scientific, float_precision, null_value, - quote_style, maintain_order, cloud_options, credential_provider, retries - ))] - fn sink_csv( - &self, - py: Python, - path: PathBuf, - include_bom: bool, - include_header: bool, - separator: u8, - line_terminator: String, - quote_char: u8, - batch_size: NonZeroUsize, - datetime_format: Option, - date_format: Option, - time_format: Option, - float_scientific: Option, - float_precision: Option, - null_value: Option, - quote_style: Option>, - maintain_order: bool, - cloud_options: Option>, - credential_provider: Option, - retries: usize, - ) -> PyResult<()> { - let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0); - let null_value = null_value.unwrap_or(SerializeOptions::default().null); - - let serialize_options = SerializeOptions { - date_format, - time_format, - datetime_format, - float_scientific, - float_precision, - separator, - quote_char, - null: null_value, - line_terminator, - quote_style, - }; - - let options = CsvWriterOptions { - include_bom, - include_header, - maintain_order, - batch_size, - serialize_options, - }; - - let cloud_options = { - let cloud_options = - parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?; - Some( - cloud_options - .with_max_retries(retries) - .with_credential_provider( - credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object), - ), - ) - }; - - // if we don't allow threads and we have udfs trying to acquire the gil from different - // threads we deadlock. - py.allow_threads(|| { - let ldf = self.ldf.clone(); - ldf.sink_csv(path, options, cloud_options) - .map_err(PyPolarsErr::from) - })?; - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - #[cfg(all(feature = "streaming", feature = "json"))] - #[pyo3(signature = (path, maintain_order, cloud_options, credential_provider, retries))] - fn sink_json( - &self, - py: Python, - path: PathBuf, - maintain_order: bool, - cloud_options: Option>, - credential_provider: Option, - retries: usize, - ) -> PyResult<()> { - let options = JsonWriterOptions { maintain_order }; - - let cloud_options = { - let cloud_options = - parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?; - Some( - cloud_options - .with_max_retries(retries) - .with_credential_provider( - credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object), - ), - ) - }; - - // if we don't allow threads and we have udfs trying to acquire the gil from different - // threads we deadlock. - py.allow_threads(|| { - let ldf = self.ldf.clone(); - ldf.sink_json(path, options, cloud_options) - .map_err(PyPolarsErr::from) - })?; - Ok(()) - } - fn fetch(&self, py: Python, n_rows: usize) -> PyResult { let ldf = self.ldf.clone(); let df = py.allow_threads(|| ldf.fetch(n_rows).map_err(PyPolarsErr::from))?; diff --git a/crates/polars-python/src/lazyframe/io.rs b/crates/polars-python/src/lazyframe/io.rs new file mode 100644 index 000000000000..bdb4dff52e54 --- /dev/null +++ b/crates/polars-python/src/lazyframe/io.rs @@ -0,0 +1,642 @@ +use std::num::NonZeroUsize; +use std::path::PathBuf; + +use polars::io::{HiveOptions, RowIndex}; +use polars_core::prelude::*; +#[cfg(feature = "parquet")] +use polars_parquet::arrow::write::StatisticsOptions; +use polars_plan::plans::ScanSources; +use pyo3::prelude::*; +use pyo3::pybacked::PyBackedStr; +use pyo3::types::PyList; + +use super::PyLazyFrame; +use crate::error::PyPolarsErr; +use crate::interop::arrow::to_rust::pyarrow_schema_to_rust; +use crate::prelude::*; + +fn pyobject_to_first_path_and_scan_sources( + obj: PyObject, +) -> PyResult<(Option, ScanSources)> { + use crate::file::{get_python_scan_source_input, PythonScanSourceInput}; + Ok(match get_python_scan_source_input(obj, false)? { + PythonScanSourceInput::Path(path) => { + (Some(path.clone()), ScanSources::Paths([path].into())) + }, + PythonScanSourceInput::File(file) => (None, ScanSources::Files([file].into())), + PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())), + }) +} + +#[pymethods] +#[allow(clippy::should_implement_trait)] +impl PyLazyFrame { + #[staticmethod] + #[cfg(feature = "json")] + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = ( + source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk, + row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl + ))] + fn new_from_ndjson( + source: Option, + sources: Wrap, + infer_schema_length: Option, + schema: Option>, + schema_overrides: Option>, + batch_size: Option, + n_rows: Option, + low_memory: bool, + rechunk: bool, + row_index: Option<(String, IdxSize)>, + ignore_errors: bool, + include_file_paths: Option, + cloud_options: Option>, + credential_provider: Option, + retries: usize, + file_cache_ttl: Option, + ) -> PyResult { + use cloud::credential_provider::PlCredentialProvider; + let row_index = row_index.map(|(name, offset)| RowIndex { + name: name.into(), + offset, + }); + + let sources = sources.0; + let (first_path, sources) = match source { + None => (sources.first_path().map(|p| p.to_path_buf()), sources), + Some(source) => pyobject_to_first_path_and_scan_sources(source)?, + }; + + let mut r = LazyJsonLineReader::new_with_sources(sources); + + #[cfg(feature = "cloud")] + if let Some(first_path) = first_path { + let first_path_url = first_path.to_string_lossy(); + + let mut cloud_options = + parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?; + cloud_options = cloud_options + .with_max_retries(retries) + .with_credential_provider( + credential_provider.map(PlCredentialProvider::from_python_func_object), + ); + + if let Some(file_cache_ttl) = file_cache_ttl { + cloud_options.file_cache_ttl = file_cache_ttl; + } + + r = r.with_cloud_options(Some(cloud_options)); + }; + + let lf = r + .with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new)) + .with_batch_size(batch_size) + .with_n_rows(n_rows) + .low_memory(low_memory) + .with_rechunk(rechunk) + .with_schema(schema.map(|schema| Arc::new(schema.0))) + .with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0))) + .with_row_index(row_index) + .with_ignore_errors(ignore_errors) + .with_include_file_paths(include_file_paths.map(|x| x.into())) + .finish() + .map_err(PyPolarsErr::from)?; + + Ok(lf.into()) + } + + #[staticmethod] + #[cfg(feature = "csv")] + #[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, + low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string, + infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header, + encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema, + cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths + ) + )] + fn new_from_csv( + source: Option, + sources: Wrap, + separator: &str, + has_header: bool, + ignore_errors: bool, + skip_rows: usize, + n_rows: Option, + cache: bool, + overwrite_dtype: Option)>>, + low_memory: bool, + comment_prefix: Option<&str>, + quote_char: Option<&str>, + null_values: Option>, + missing_utf8_is_empty_string: bool, + infer_schema_length: Option, + with_schema_modify: Option, + rechunk: bool, + skip_rows_after_header: usize, + encoding: Wrap, + row_index: Option<(String, IdxSize)>, + try_parse_dates: bool, + eol_char: &str, + raise_if_empty: bool, + truncate_ragged_lines: bool, + decimal_comma: bool, + glob: bool, + schema: Option>, + cloud_options: Option>, + credential_provider: Option, + retries: usize, + file_cache_ttl: Option, + include_file_paths: Option, + ) -> PyResult { + use cloud::credential_provider::PlCredentialProvider; + + let null_values = null_values.map(|w| w.0); + let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied(); + let separator = separator + .as_bytes() + .first() + .ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty")) + .copied() + .map_err(PyPolarsErr::from)?; + let eol_char = eol_char + .as_bytes() + .first() + .ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty")) + .copied() + .map_err(PyPolarsErr::from)?; + let row_index = row_index.map(|(name, offset)| RowIndex { + name: name.into(), + offset, + }); + + let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| { + overwrite_dtype + .into_iter() + .map(|(name, dtype)| Field::new((&*name).into(), dtype.0)) + .collect::() + }); + + let sources = sources.0; + let (first_path, sources) = match source { + None => (sources.first_path().map(|p| p.to_path_buf()), sources), + Some(source) => pyobject_to_first_path_and_scan_sources(source)?, + }; + + let mut r = LazyCsvReader::new_with_sources(sources); + + #[cfg(feature = "cloud")] + if let Some(first_path) = first_path { + let first_path_url = first_path.to_string_lossy(); + + let mut cloud_options = + parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?; + if let Some(file_cache_ttl) = file_cache_ttl { + cloud_options.file_cache_ttl = file_cache_ttl; + } + cloud_options = cloud_options + .with_max_retries(retries) + .with_credential_provider( + credential_provider.map(PlCredentialProvider::from_python_func_object), + ); + r = r.with_cloud_options(Some(cloud_options)); + } + + let mut r = r + .with_infer_schema_length(infer_schema_length) + .with_separator(separator) + .with_has_header(has_header) + .with_ignore_errors(ignore_errors) + .with_skip_rows(skip_rows) + .with_n_rows(n_rows) + .with_cache(cache) + .with_dtype_overwrite(overwrite_dtype.map(Arc::new)) + .with_schema(schema.map(|schema| Arc::new(schema.0))) + .with_low_memory(low_memory) + .with_comment_prefix(comment_prefix.map(|x| x.into())) + .with_quote_char(quote_char) + .with_eol_char(eol_char) + .with_rechunk(rechunk) + .with_skip_rows_after_header(skip_rows_after_header) + .with_encoding(encoding.0) + .with_row_index(row_index) + .with_try_parse_dates(try_parse_dates) + .with_null_values(null_values) + .with_missing_is_null(!missing_utf8_is_empty_string) + .with_truncate_ragged_lines(truncate_ragged_lines) + .with_decimal_comma(decimal_comma) + .with_glob(glob) + .with_raise_if_empty(raise_if_empty) + .with_include_file_paths(include_file_paths.map(|x| x.into())); + + if let Some(lambda) = with_schema_modify { + let f = |schema: Schema| { + let iter = schema.iter_names().map(|s| s.as_str()); + Python::with_gil(|py| { + let names = PyList::new_bound(py, iter); + + let out = lambda.call1(py, (names,)).expect("python function failed"); + let new_names = out + .extract::>(py) + .expect("python function should return List[str]"); + polars_ensure!(new_names.len() == schema.len(), + ShapeMismatch: "The length of the new names list should be equal to or less than the original column length", + ); + Ok(schema + .iter_values() + .zip(new_names) + .map(|(dtype, name)| Field::new(name.into(), dtype.clone())) + .collect()) + }) + }; + r = r.with_schema_modify(f).map_err(PyPolarsErr::from)? + } + + Ok(r.finish().map_err(PyPolarsErr::from)?.into()) + } + + #[cfg(feature = "parquet")] + #[staticmethod] + #[pyo3(signature = ( + source, sources, n_rows, cache, parallel, rechunk, row_index, low_memory, cloud_options, + credential_provider, use_statistics, hive_partitioning, schema, hive_schema, + try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns, + ))] + fn new_from_parquet( + source: Option, + sources: Wrap, + n_rows: Option, + cache: bool, + parallel: Wrap, + rechunk: bool, + row_index: Option<(String, IdxSize)>, + low_memory: bool, + cloud_options: Option>, + credential_provider: Option, + use_statistics: bool, + hive_partitioning: Option, + schema: Option>, + hive_schema: Option>, + try_parse_hive_dates: bool, + retries: usize, + glob: bool, + include_file_paths: Option, + allow_missing_columns: bool, + ) -> PyResult { + use cloud::credential_provider::PlCredentialProvider; + + let parallel = parallel.0; + let hive_schema = hive_schema.map(|s| Arc::new(s.0)); + + let row_index = row_index.map(|(name, offset)| RowIndex { + name: name.into(), + offset, + }); + + let hive_options = HiveOptions { + enabled: hive_partitioning, + hive_start_idx: 0, + schema: hive_schema, + try_parse_dates: try_parse_hive_dates, + }; + + let mut args = ScanArgsParquet { + n_rows, + cache, + parallel, + rechunk, + row_index, + low_memory, + cloud_options: None, + use_statistics, + schema: schema.map(|x| Arc::new(x.0)), + hive_options, + glob, + include_file_paths: include_file_paths.map(|x| x.into()), + allow_missing_columns, + }; + + let sources = sources.0; + let (first_path, sources) = match source { + None => (sources.first_path().map(|p| p.to_path_buf()), sources), + Some(source) => pyobject_to_first_path_and_scan_sources(source)?, + }; + + #[cfg(feature = "cloud")] + if let Some(first_path) = first_path { + let first_path_url = first_path.to_string_lossy(); + let cloud_options = + parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?; + args.cloud_options = Some( + cloud_options + .with_max_retries(retries) + .with_credential_provider( + credential_provider.map(PlCredentialProvider::from_python_func_object), + ), + ); + } + + let lf = LazyFrame::scan_parquet_sources(sources, args).map_err(PyPolarsErr::from)?; + + Ok(lf.into()) + } + + #[cfg(feature = "ipc")] + #[staticmethod] + #[pyo3(signature = ( + source, sources, n_rows, cache, rechunk, row_index, cloud_options,credential_provider, + hive_partitioning, hive_schema, try_parse_hive_dates, retries, file_cache_ttl, + include_file_paths + ))] + fn new_from_ipc( + source: Option, + sources: Wrap, + n_rows: Option, + cache: bool, + rechunk: bool, + row_index: Option<(String, IdxSize)>, + cloud_options: Option>, + credential_provider: Option, + hive_partitioning: Option, + hive_schema: Option>, + try_parse_hive_dates: bool, + retries: usize, + file_cache_ttl: Option, + include_file_paths: Option, + ) -> PyResult { + use cloud::credential_provider::PlCredentialProvider; + let row_index = row_index.map(|(name, offset)| RowIndex { + name: name.into(), + offset, + }); + + let hive_options = HiveOptions { + enabled: hive_partitioning, + hive_start_idx: 0, + schema: hive_schema.map(|x| Arc::new(x.0)), + try_parse_dates: try_parse_hive_dates, + }; + + let mut args = ScanArgsIpc { + n_rows, + cache, + rechunk, + row_index, + #[cfg(feature = "cloud")] + cloud_options: None, + hive_options, + include_file_paths: include_file_paths.map(|x| x.into()), + }; + + let sources = sources.0; + let (first_path, sources) = match source { + None => (sources.first_path().map(|p| p.to_path_buf()), sources), + Some(source) => pyobject_to_first_path_and_scan_sources(source)?, + }; + + #[cfg(feature = "cloud")] + if let Some(first_path) = first_path { + let first_path_url = first_path.to_string_lossy(); + + let mut cloud_options = + parse_cloud_options(&first_path_url, cloud_options.unwrap_or_default())?; + if let Some(file_cache_ttl) = file_cache_ttl { + cloud_options.file_cache_ttl = file_cache_ttl; + } + args.cloud_options = Some( + cloud_options + .with_max_retries(retries) + .with_credential_provider( + credential_provider.map(PlCredentialProvider::from_python_func_object), + ), + ); + } + + let lf = LazyFrame::scan_ipc_sources(sources, args).map_err(PyPolarsErr::from)?; + Ok(lf.into()) + } + + #[staticmethod] + fn scan_from_python_function_arrow_schema( + schema: &Bound<'_, PyList>, + scan_fn: PyObject, + pyarrow: bool, + ) -> PyResult { + let schema = pyarrow_schema_to_rust(schema)?; + Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into()) + } + + #[staticmethod] + fn scan_from_python_function_pl_schema( + schema: Vec<(PyBackedStr, Wrap)>, + scan_fn: PyObject, + pyarrow: bool, + ) -> PyResult { + let schema = Schema::from_iter( + schema + .into_iter() + .map(|(name, dt)| Field::new((&*name).into(), dt.0)), + ); + Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into()) + } + + #[cfg(all(feature = "streaming", feature = "parquet"))] + #[pyo3(signature = ( + path, compression, compression_level, statistics, row_group_size, data_page_size, + maintain_order, cloud_options, credential_provider, retries + ))] + fn sink_parquet( + &self, + py: Python, + path: PathBuf, + compression: &str, + compression_level: Option, + statistics: Wrap, + row_group_size: Option, + data_page_size: Option, + maintain_order: bool, + cloud_options: Option>, + credential_provider: Option, + retries: usize, + ) -> PyResult<()> { + let compression = parse_parquet_compression(compression, compression_level)?; + + let options = ParquetWriteOptions { + compression, + statistics: statistics.0, + row_group_size, + data_page_size, + maintain_order, + }; + + let cloud_options = { + let cloud_options = + parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?; + Some( + cloud_options + .with_max_retries(retries) + .with_credential_provider( + credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object), + ), + ) + }; + + // if we don't allow threads and we have udfs trying to acquire the gil from different + // threads we deadlock. + py.allow_threads(|| { + let ldf = self.ldf.clone(); + ldf.sink_parquet(&path, options, cloud_options) + .map_err(PyPolarsErr::from) + })?; + Ok(()) + } + + #[cfg(all(feature = "streaming", feature = "ipc"))] + #[pyo3(signature = (path, compression, maintain_order, cloud_options, credential_provider, retries))] + fn sink_ipc( + &self, + py: Python, + path: PathBuf, + compression: Option>, + maintain_order: bool, + cloud_options: Option>, + credential_provider: Option, + retries: usize, + ) -> PyResult<()> { + let options = IpcWriterOptions { + compression: compression.map(|c| c.0), + maintain_order, + }; + + let cloud_options = { + let cloud_options = + parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?; + Some( + cloud_options + .with_max_retries(retries) + .with_credential_provider( + credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object), + ), + ) + }; + + // if we don't allow threads and we have udfs trying to acquire the gil from different + // threads we deadlock. + py.allow_threads(|| { + let ldf = self.ldf.clone(); + ldf.sink_ipc(path, options, cloud_options) + .map_err(PyPolarsErr::from) + })?; + Ok(()) + } + + #[cfg(all(feature = "streaming", feature = "csv"))] + #[pyo3(signature = ( + path, include_bom, include_header, separator, line_terminator, quote_char, batch_size, + datetime_format, date_format, time_format, float_scientific, float_precision, null_value, + quote_style, maintain_order, cloud_options, credential_provider, retries + ))] + fn sink_csv( + &self, + py: Python, + path: PathBuf, + include_bom: bool, + include_header: bool, + separator: u8, + line_terminator: String, + quote_char: u8, + batch_size: NonZeroUsize, + datetime_format: Option, + date_format: Option, + time_format: Option, + float_scientific: Option, + float_precision: Option, + null_value: Option, + quote_style: Option>, + maintain_order: bool, + cloud_options: Option>, + credential_provider: Option, + retries: usize, + ) -> PyResult<()> { + let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0); + let null_value = null_value.unwrap_or(SerializeOptions::default().null); + + let serialize_options = SerializeOptions { + date_format, + time_format, + datetime_format, + float_scientific, + float_precision, + separator, + quote_char, + null: null_value, + line_terminator, + quote_style, + }; + + let options = CsvWriterOptions { + include_bom, + include_header, + maintain_order, + batch_size, + serialize_options, + }; + + let cloud_options = { + let cloud_options = + parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?; + Some( + cloud_options + .with_max_retries(retries) + .with_credential_provider( + credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object), + ), + ) + }; + + // if we don't allow threads and we have udfs trying to acquire the gil from different + // threads we deadlock. + py.allow_threads(|| { + let ldf = self.ldf.clone(); + ldf.sink_csv(path, options, cloud_options) + .map_err(PyPolarsErr::from) + })?; + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + #[cfg(all(feature = "streaming", feature = "json"))] + #[pyo3(signature = (path, maintain_order, cloud_options, credential_provider, retries))] + fn sink_json( + &self, + py: Python, + path: PathBuf, + maintain_order: bool, + cloud_options: Option>, + credential_provider: Option, + retries: usize, + ) -> PyResult<()> { + let options = JsonWriterOptions { maintain_order }; + + let cloud_options = { + let cloud_options = + parse_cloud_options(path.to_str().unwrap(), cloud_options.unwrap_or_default())?; + Some( + cloud_options + .with_max_retries(retries) + .with_credential_provider( + credential_provider.map(polars::prelude::cloud::credential_provider::PlCredentialProvider::from_python_func_object), + ), + ) + }; + + // if we don't allow threads and we have udfs trying to acquire the gil from different + // threads we deadlock. + py.allow_threads(|| { + let ldf = self.ldf.clone(); + ldf.sink_json(path, options, cloud_options) + .map_err(PyPolarsErr::from) + })?; + Ok(()) + } +} diff --git a/crates/polars-python/src/lazyframe/mod.rs b/crates/polars-python/src/lazyframe/mod.rs index 85433332e415..9bdc43540662 100644 --- a/crates/polars-python/src/lazyframe/mod.rs +++ b/crates/polars-python/src/lazyframe/mod.rs @@ -1,6 +1,7 @@ mod exitable; #[cfg(feature = "pymethods")] mod general; +mod io; #[cfg(feature = "pymethods")] mod serde; pub mod visit;