diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index fe9e5e671..130e838db 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -35,6 +35,33 @@ jobs: - name: Test run: cargo test + emscripten: + name: Build pyodide wheel + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: pip install pyodide-build + - name: Get Emscripten and Python version info + shell: bash + run: | + echo EMSCRIPTEN_VERSION=$(pyodide config get emscripten_version) >> $GITHUB_ENV + echo PYTHON_VERSION=$(pyodide config get python_version | cut -d '.' -f 1-2) >> $GITHUB_ENV + pip uninstall -y pyodide-build + - uses: mymindstorm/setup-emsdk@v14 + with: + version: ${{ env.EMSCRIPTEN_VERSION }} + actions-cache-folder: emsdk-cache + - uses: actions/setup-python@v5 + with: + python-version: ${{ env.PYTHON_VERSION }} + - run: pip install pyodide-build + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: wasm32-unknown-emscripten + args: --no-default-features --out dist -m python/core/Cargo.toml + rust-toolchain: nightly + # lint-python: # name: Lint Python code # runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index da30877e8..775196f54 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +emsdk vcpkg vcpkg_installed .pyodide* diff --git a/python/core/.gitignore b/python/core/.gitignore new file mode 100644 index 000000000..41fb4c0c2 --- /dev/null +++ b/python/core/.gitignore @@ -0,0 +1 @@ +emsdk/ diff --git a/python/core/Cargo.lock b/python/core/Cargo.lock index a212d2e9b..cf16d6981 100644 --- a/python/core/Cargo.lock +++ b/python/core/Cargo.lock @@ -1444,12 +1444,6 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" -[[package]] -name = "inventory" -version = "0.3.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f958d3d68f4167080a18141e10381e7634563984a537f2a49a30fd8e53ac5767" - [[package]] name = "ipnet" version = "2.9.0" @@ -2234,7 +2228,6 @@ dependencies = [ "hashbrown", "indexmap", "indoc", - "inventory", "libc", "memoffset", "parking_lot", @@ -2248,8 +2241,9 @@ dependencies = [ [[package]] name = "pyo3-arrow" -version = "0.1.0" -source = "git+https://github.com/kylebarron/arro3?rev=d0d737a03c141ff316e3e354d85828edb42338d4#d0d737a03c141ff316e3e354d85828edb42338d4" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bc9742f9022bfbeb9c82f4d3d6437dea55aff2d885950c813084ac712569d3a" dependencies = [ "arrow", "arrow-array", diff --git a/python/core/Cargo.toml b/python/core/Cargo.toml index a630dae70..1fa45b5d6 100644 --- a/python/core/Cargo.toml +++ b/python/core/Cargo.toml @@ -16,45 +16,64 @@ rust-version = "1.80" name = "_rust" crate-type = ["cdylib"] +[features] +default = ["async", "libc", "rayon"] +async = [ + "dep:futures", + "dep:object_store", + "parquet/object_store", + "dep:pyo3-asyncio-0-21", + "geoarrow/flatgeobuf_async", + "geoarrow/parquet_async", + "geoarrow/postgis", + "dep:sqlx", + "dep:tokio", +] +libc = ["geoarrow/polylabel"] +rayon = ["geoarrow/rayon"] + [dependencies] arrow = "52" arrow-array = "52" arrow-buffer = "52" bytes = "1" flatgeobuf = { version = "4.2.0", default-features = false } -futures = "0.3" -object_store = { version = "0.10", features = ["aws", "azure", "gcp", "http"] } -parquet = { version = "52", features = ["object_store"] } +futures = { version = "0.3", optional = true } +object_store = { version = "0.10", features = [ + "aws", + "azure", + "gcp", + "http", +], optional = true } +parquet = "52" pyo3 = { version = "0.21.0", features = [ "abi3-py38", - "multiple-pymethods", "hashbrown", "serde", "anyhow", ] } -pyo3-arrow = { git = "https://github.com/kylebarron/arro3", rev = "d0d737a03c141ff316e3e354d85828edb42338d4" } -pyo3-asyncio-0-21 = { version = "0.21", features = ["tokio-runtime"] } +pyo3-arrow = "0.2" +pyo3-asyncio-0-21 = { version = "0.21", features = [ + "tokio-runtime", +], optional = true } pythonize = "0.21" geo = "0.28" geoarrow = { path = "../../", features = [ "csv", - "flatgeobuf_async", "flatgeobuf", "geozero", "ipc_compression", - "parquet_async", "parquet_compression", "parquet", - "polylabel", - "postgis", - "rayon", ] } geozero = { version = "0.13", features = ["with-svg"] } numpy = "0.21" serde_json = "1" -sqlx = { version = "0.7", default-features = false, features = ["postgres"] } +sqlx = { version = "0.7", default-features = false, features = [ + "postgres", +], optional = true } thiserror = "1" -tokio = { version = "1.9", features = ["rt"] } +tokio = { version = "1.9", features = ["rt"], optional = true } url = "2.5" # reqwest is pulled in by object store, but not used by python binding itself diff --git a/python/core/DEVELOP.md b/python/core/DEVELOP.md new file mode 100644 index 000000000..9133677f3 --- /dev/null +++ b/python/core/DEVELOP.md @@ -0,0 +1,42 @@ +## Pyodide + + +Install rust nightly and add wasm toolchain + +``` +rustup toolchain install nightly +rustup target add --toolchain nightly wasm32-unknown-emscripten +``` + +Install dependencies. You need to set the `pyodide-build` version to the same version as the `pyodide` release you distribute for. + +``` +pip install -U maturin +pip install pyodide-build +``` + +Install emsdk. + +``` +git clone https://github.com/emscripten-core/emsdk.git +cd emsdk +PYODIDE_EMSCRIPTEN_VERSION=$(pyodide config get emscripten_version) +./emsdk install ${PYODIDE_EMSCRIPTEN_VERSION} +./emsdk activate ${PYODIDE_EMSCRIPTEN_VERSION} +source emsdk_env.sh +cd .. +``` + +- The `RUSTFLAGS` is temporary to get around this compiler bug. +- You must use rust nightly +- You must use `--no-default-features` to remove any async support. `tokio` does not compile for emscripten. + +```bash +RUSTFLAGS='-Zinline-mir=no' / + RUSTUP_TOOLCHAIN=nightly / + maturin build / + --no-default-features / + --release / + -o dist / + --target wasm32-unknown-emscripten +``` diff --git a/python/core/src/algorithm/geo/area.rs b/python/core/src/algorithm/geo/area.rs index 813aae5e0..57c346c46 100644 --- a/python/core/src/algorithm/geo/area.rs +++ b/python/core/src/algorithm/geo/area.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::error::PyGeoArrowResult; use crate::ffi::from_python::AnyGeometryInput; use geoarrow::algorithm::geo::{Area, ChamberlainDuquetteArea, GeodesicArea}; @@ -38,7 +40,7 @@ pub fn area(py: Python, input: AnyGeometryInput, method: AreaMethod) -> PyGeoArr AreaMethod::Euclidean => arr.as_ref().unsigned_area()?, AreaMethod::Geodesic => arr.as_ref().geodesic_area_unsigned()?, }; - Ok(PyArray::from_array(out).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(out)).to_arro3(py)?) } AnyGeometryInput::Chunked(arr) => { let out = match method { @@ -48,7 +50,7 @@ pub fn area(py: Python, input: AnyGeometryInput, method: AreaMethod) -> PyGeoArr AreaMethod::Euclidean => arr.as_ref().unsigned_area()?, AreaMethod::Geodesic => arr.as_ref().geodesic_area_unsigned()?, }; - Ok(PyChunkedArray::from_arrays(out.chunks())?.to_arro3(py)?) + Ok(PyChunkedArray::from_array_refs(out.chunk_refs())?.to_arro3(py)?) } } } @@ -72,7 +74,7 @@ pub fn signed_area( AreaMethod::Euclidean => arr.as_ref().signed_area()?, AreaMethod::Geodesic => arr.as_ref().geodesic_area_signed()?, }; - Ok(PyArray::from_array(out).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(out)).to_arro3(py)?) } AnyGeometryInput::Chunked(arr) => { let out = match method { @@ -82,7 +84,7 @@ pub fn signed_area( AreaMethod::Euclidean => arr.as_ref().signed_area()?, AreaMethod::Geodesic => arr.as_ref().geodesic_area_signed()?, }; - Ok(PyChunkedArray::from_arrays(out.chunks())?.to_arro3(py)?) + Ok(PyChunkedArray::from_array_refs(out.chunk_refs())?.to_arro3(py)?) } } } diff --git a/python/core/src/algorithm/geo/dimensions.rs b/python/core/src/algorithm/geo/dimensions.rs index 4b116625b..aa6c5ee66 100644 --- a/python/core/src/algorithm/geo/dimensions.rs +++ b/python/core/src/algorithm/geo/dimensions.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::error::PyGeoArrowResult; use crate::ffi::from_python::AnyGeometryInput; use geoarrow::algorithm::geo::HasDimensions; @@ -9,11 +11,11 @@ pub fn is_empty(py: Python, input: AnyGeometryInput) -> PyGeoArrowResult { let out = HasDimensions::is_empty(&arr.as_ref())?; - Ok(PyArray::from_array(out).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(out)).to_arro3(py)?) } AnyGeometryInput::Chunked(arr) => { let out = HasDimensions::is_empty(&arr.as_ref())?; - Ok(PyChunkedArray::from_arrays(out.chunks())?.to_arro3(py)?) + Ok(PyChunkedArray::from_array_refs(out.chunk_refs())?.to_arro3(py)?) } } } diff --git a/python/core/src/algorithm/geo/frechet_distance.rs b/python/core/src/algorithm/geo/frechet_distance.rs index 43a1ac673..66bf4affd 100644 --- a/python/core/src/algorithm/geo/frechet_distance.rs +++ b/python/core/src/algorithm/geo/frechet_distance.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::error::PyGeoArrowResult; use crate::ffi::from_python::input::AnyGeometryBroadcastInput; use crate::ffi::from_python::AnyGeometryInput; @@ -15,21 +17,21 @@ pub fn frechet_distance( match (input, other) { (AnyGeometryInput::Array(left), AnyGeometryBroadcastInput::Array(right)) => { let result = FrechetDistance::frechet_distance(&left.as_ref(), &right.as_ref())?; - Ok(PyArray::from_array(result).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(result)).to_arro3(py)?) } (AnyGeometryInput::Chunked(left), AnyGeometryBroadcastInput::Chunked(right)) => { let result = FrechetDistance::frechet_distance(&left.as_ref(), &right.as_ref())?; - Ok(PyChunkedArray::from_arrays(result.chunks())?.to_arro3(py)?) + Ok(PyChunkedArray::from_array_refs(result.chunk_refs())?.to_arro3(py)?) } (AnyGeometryInput::Array(left), AnyGeometryBroadcastInput::Scalar(right)) => { let scalar = right.to_geo_line_string()?; let result = FrechetDistanceLineString::frechet_distance(&left.as_ref(), &scalar)?; - Ok(PyArray::from_array(result).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(result)).to_arro3(py)?) } (AnyGeometryInput::Chunked(left), AnyGeometryBroadcastInput::Scalar(right)) => { let scalar = right.to_geo_line_string()?; let result = FrechetDistanceLineString::frechet_distance(&left.as_ref(), &scalar)?; - Ok(PyChunkedArray::from_arrays(result.chunks())?.to_arro3(py)?) + Ok(PyChunkedArray::from_array_refs(result.chunk_refs())?.to_arro3(py)?) } _ => Err(PyValueError::new_err("Unsupported input types.").into()), } diff --git a/python/core/src/algorithm/geo/geodesic_area.rs b/python/core/src/algorithm/geo/geodesic_area.rs index 76394100b..b34fe1627 100644 --- a/python/core/src/algorithm/geo/geodesic_area.rs +++ b/python/core/src/algorithm/geo/geodesic_area.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::error::PyGeoArrowResult; use crate::ffi::from_python::AnyGeometryInput; use geoarrow::algorithm::geo::GeodesicArea; @@ -9,11 +11,11 @@ pub fn geodesic_perimeter(py: Python, input: AnyGeometryInput) -> PyGeoArrowResu match input { AnyGeometryInput::Array(arr) => { let out = arr.as_ref().geodesic_perimeter()?; - Ok(PyArray::from_array(out).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(out)).to_arro3(py)?) } AnyGeometryInput::Chunked(arr) => { let out = arr.as_ref().geodesic_perimeter()?; - Ok(PyChunkedArray::from_arrays(out.chunks())?.to_arro3(py)?) + Ok(PyChunkedArray::from_array_refs(out.chunk_refs())?.to_arro3(py)?) } } } diff --git a/python/core/src/algorithm/geo/length.rs b/python/core/src/algorithm/geo/length.rs index af1f06480..a8472e3fc 100644 --- a/python/core/src/algorithm/geo/length.rs +++ b/python/core/src/algorithm/geo/length.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::error::PyGeoArrowResult; use crate::ffi::from_python::AnyGeometryInput; use geoarrow::algorithm::geo::{EuclideanLength, GeodesicLength, HaversineLength, VincentyLength}; @@ -43,7 +45,7 @@ pub fn length( LengthMethod::Haversine => arr.as_ref().haversine_length()?, LengthMethod::Vincenty => arr.as_ref().vincenty_length()?, }; - Ok(PyArray::from_array(out).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(out)).to_arro3(py)?) } AnyGeometryInput::Chunked(arr) => { let out = match method { @@ -52,7 +54,7 @@ pub fn length( LengthMethod::Haversine => arr.as_ref().haversine_length()?, LengthMethod::Vincenty => arr.as_ref().vincenty_length()?, }; - Ok(PyChunkedArray::from_arrays(out.chunks())?.to_arro3(py)?) + Ok(PyChunkedArray::from_array_refs(out.chunk_refs())?.to_arro3(py)?) } } } diff --git a/python/core/src/algorithm/geo/line_locate_point.rs b/python/core/src/algorithm/geo/line_locate_point.rs index 0b202a5fb..ddb49d74d 100644 --- a/python/core/src/algorithm/geo/line_locate_point.rs +++ b/python/core/src/algorithm/geo/line_locate_point.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::error::PyGeoArrowResult; use crate::ffi::from_python::input::AnyGeometryBroadcastInput; use crate::ffi::from_python::AnyGeometryInput; @@ -15,21 +17,21 @@ pub fn line_locate_point( match (input, point) { (AnyGeometryInput::Array(arr), AnyGeometryBroadcastInput::Array(point)) => { let result = LineLocatePoint::line_locate_point(&arr.as_ref(), point.as_ref())?; - Ok(PyArray::from_array(result).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(result)).to_arro3(py)?) } (AnyGeometryInput::Chunked(arr), AnyGeometryBroadcastInput::Chunked(point)) => { let result = LineLocatePoint::line_locate_point(&arr.as_ref(), point.as_ref())?; - Ok(PyChunkedArray::from_arrays(result.chunks())?.to_arro3(py)?) + Ok(PyChunkedArray::from_array_refs(result.chunk_refs())?.to_arro3(py)?) } (AnyGeometryInput::Array(arr), AnyGeometryBroadcastInput::Scalar(point)) => { let scalar = point.to_geo_point()?; let result = LineLocatePointScalar::line_locate_point(&arr.as_ref(), &scalar)?; - Ok(PyArray::from_array(result).to_arro3(py)?) + Ok(PyArray::from_array_ref(Arc::new(result)).to_arro3(py)?) } (AnyGeometryInput::Chunked(arr), AnyGeometryBroadcastInput::Scalar(point)) => { let scalar = point.to_geo_point()?; let result = LineLocatePointScalar::line_locate_point(&arr.as_ref(), &scalar)?; - Ok(PyChunkedArray::from_arrays(result.chunks())?.to_arro3(py)?) + Ok(PyChunkedArray::from_array_refs(result.chunk_refs())?.to_arro3(py)?) } _ => Err(PyValueError::new_err("Unsupported input types.").into()), } diff --git a/python/core/src/algorithm/mod.rs b/python/core/src/algorithm/mod.rs index 1e296b2fa..1eff5ee3e 100644 --- a/python/core/src/algorithm/mod.rs +++ b/python/core/src/algorithm/mod.rs @@ -1,3 +1,5 @@ pub mod geo; pub mod native; + +#[cfg(feature = "libc")] pub mod polylabel; diff --git a/python/core/src/error.rs b/python/core/src/error.rs index 98091e80e..92fe43ec3 100644 --- a/python/core/src/error.rs +++ b/python/core/src/error.rs @@ -6,7 +6,9 @@ pub enum PyGeoArrowError { PyErr(PyErr), PyArrowError(pyo3_arrow::error::PyArrowError), PythonizeError(pythonize::PythonizeError), + #[cfg(feature = "async")] ObjectStoreError(object_store::Error), + #[cfg(feature = "async")] ObjectStorePathError(object_store::path::Error), SerdeJsonError(serde_json::Error), UrlParseError(url::ParseError), @@ -19,7 +21,9 @@ impl From for PyErr { PyGeoArrowError::PyErr(err) => err, PyGeoArrowError::PyArrowError(err) => err.into(), PyGeoArrowError::PythonizeError(err) => PyException::new_err(err.to_string()), + #[cfg(feature = "async")] PyGeoArrowError::ObjectStoreError(err) => PyException::new_err(err.to_string()), + #[cfg(feature = "async")] PyGeoArrowError::ObjectStorePathError(err) => PyException::new_err(err.to_string()), PyGeoArrowError::SerdeJsonError(err) => PyException::new_err(err.to_string()), PyGeoArrowError::UrlParseError(err) => PyException::new_err(err.to_string()), @@ -45,12 +49,14 @@ impl From for PyGeoArrowError { } } +#[cfg(feature = "async")] impl From for PyGeoArrowError { fn from(other: object_store::Error) -> Self { Self::ObjectStoreError(other) } } +#[cfg(feature = "async")] impl From for PyGeoArrowError { fn from(other: object_store::path::Error) -> Self { Self::ObjectStorePathError(other) diff --git a/python/core/src/interop/pyogrio/from_pyogrio.rs b/python/core/src/interop/pyogrio/from_pyogrio.rs index d9faf3749..2bdc06f20 100644 --- a/python/core/src/interop/pyogrio/from_pyogrio.rs +++ b/python/core/src/interop/pyogrio/from_pyogrio.rs @@ -60,10 +60,7 @@ pub fn read_pyogrio( .call_method0(intern!(py, "__enter__"))? .extract::<(PyObject, PyObject)>()?; - let maybe_table = PyTable::from_arrow( - &py.get_type_bound::(), - record_batch_reader.bind(py).extract()?, - ); + let maybe_table = record_batch_reader.bind(py).extract::(); // If the eval threw an exception we'll pass it through to the context manager. // Otherwise, __exit__ is called with empty arguments (Python "None"). @@ -74,8 +71,7 @@ pub fn read_pyogrio( context_manager.call_method1("__exit__", (&none, &none, &none))?; Ok(table.to_arro3(py)?) } - Err(e) => { - let py_err = PyErr::from(e); + Err(py_err) => { context_manager.call_method1( "__exit__", ( diff --git a/python/core/src/interop/util.rs b/python/core/src/interop/util.rs index 9449d92a7..7180c64d9 100644 --- a/python/core/src/interop/util.rs +++ b/python/core/src/interop/util.rs @@ -51,7 +51,7 @@ pub(crate) fn import_pyogrio(py: Python) -> PyGeoArrowResult> { pub(crate) fn table_to_pytable(table: geoarrow::table::Table) -> PyTable { let (batches, schema) = table.into_inner(); - PyTable::new(batches, schema) + PyTable::try_new(batches, schema).unwrap() } pub(crate) fn pytable_to_table(table: PyTable) -> Result { diff --git a/python/core/src/io/flatgeobuf/async.rs b/python/core/src/io/flatgeobuf/async.rs new file mode 100644 index 000000000..0d19d586e --- /dev/null +++ b/python/core/src/io/flatgeobuf/async.rs @@ -0,0 +1,39 @@ +use crate::error::{PyGeoArrowError, PyGeoArrowResult}; +use crate::interop::util::table_to_pytable; +use crate::io::input::{construct_reader, AnyFileReader}; +use geoarrow::io::flatgeobuf::read_flatgeobuf_async as _read_flatgeobuf_async; +use geoarrow::io::flatgeobuf::FlatGeobufReaderOptions; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; + +#[pyfunction] +#[pyo3(signature = (path, *, fs=None, batch_size=65536, bbox=None))] +pub fn read_flatgeobuf_async( + py: Python, + path: PyObject, + fs: Option, + batch_size: usize, + bbox: Option<(f64, f64, f64, f64)>, +) -> PyGeoArrowResult { + let reader = construct_reader(py, path, fs)?; + match reader { + AnyFileReader::Async(async_reader) => { + let fut = pyo3_asyncio_0_21::tokio::future_into_py(py, async move { + let options = FlatGeobufReaderOptions { + batch_size: Some(batch_size), + bbox, + ..Default::default() + }; + let table = _read_flatgeobuf_async(async_reader.store, async_reader.path, options) + .await + .map_err(PyGeoArrowError::GeoArrowError)?; + + Ok(table_to_pytable(table)) + })?; + Ok(fut.into()) + } + AnyFileReader::Sync(_) => { + Err(PyValueError::new_err("Local file paths not supported in async reader.").into()) + } + } +} diff --git a/python/core/src/io/flatgeobuf/mod.rs b/python/core/src/io/flatgeobuf/mod.rs new file mode 100644 index 000000000..62131d529 --- /dev/null +++ b/python/core/src/io/flatgeobuf/mod.rs @@ -0,0 +1,7 @@ +#[cfg(feature = "async")] +mod r#async; +mod sync; + +#[cfg(feature = "async")] +pub use r#async::read_flatgeobuf_async; +pub use sync::{read_flatgeobuf, write_flatgeobuf}; diff --git a/python/core/src/io/flatgeobuf.rs b/python/core/src/io/flatgeobuf/sync.rs similarity index 62% rename from python/core/src/io/flatgeobuf.rs rename to python/core/src/io/flatgeobuf/sync.rs index cd5665078..1d47606dd 100644 --- a/python/core/src/io/flatgeobuf.rs +++ b/python/core/src/io/flatgeobuf/sync.rs @@ -2,12 +2,9 @@ use crate::error::{PyGeoArrowError, PyGeoArrowResult}; use crate::interop::util::table_to_pytable; use crate::io::input::sync::FileWriter; use crate::io::input::{construct_reader, AnyFileReader}; -use crate::io::object_store::PyObjectStore; use flatgeobuf::FgbWriterOptions; -use geoarrow::io::flatgeobuf::read_flatgeobuf_async as _read_flatgeobuf_async; use geoarrow::io::flatgeobuf::write_flatgeobuf_with_options as _write_flatgeobuf; use geoarrow::io::flatgeobuf::{read_flatgeobuf as _read_flatgeobuf, FlatGeobufReaderOptions}; -use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3_arrow::input::AnyRecordBatch; @@ -16,13 +13,16 @@ use pyo3_arrow::input::AnyRecordBatch; pub fn read_flatgeobuf( py: Python, file: PyObject, - fs: Option, + fs: Option, batch_size: usize, bbox: Option<(f64, f64, f64, f64)>, ) -> PyGeoArrowResult { let reader = construct_reader(py, file, fs)?; match reader { + #[cfg(feature = "async")] AnyFileReader::Async(async_reader) => async_reader.runtime.block_on(async move { + use geoarrow::io::flatgeobuf::read_flatgeobuf_async as _read_flatgeobuf_async; + let options = FlatGeobufReaderOptions { batch_size: Some(batch_size), bbox, @@ -46,38 +46,6 @@ pub fn read_flatgeobuf( } } -#[pyfunction] -#[pyo3(signature = (path, *, fs=None, batch_size=65536, bbox=None))] -pub fn read_flatgeobuf_async( - py: Python, - path: PyObject, - fs: Option, - batch_size: usize, - bbox: Option<(f64, f64, f64, f64)>, -) -> PyGeoArrowResult { - let reader = construct_reader(py, path, fs)?; - match reader { - AnyFileReader::Async(async_reader) => { - let fut = pyo3_asyncio_0_21::tokio::future_into_py(py, async move { - let options = FlatGeobufReaderOptions { - batch_size: Some(batch_size), - bbox, - ..Default::default() - }; - let table = _read_flatgeobuf_async(async_reader.store, async_reader.path, options) - .await - .map_err(PyGeoArrowError::GeoArrowError)?; - - Ok(table_to_pytable(table)) - })?; - Ok(fut.into()) - } - AnyFileReader::Sync(_) => { - Err(PyValueError::new_err("Local file paths not supported in async reader.").into()) - } - } -} - #[pyfunction] #[pyo3(signature = (table, file, *, write_index=true))] pub fn write_flatgeobuf( diff --git a/python/core/src/io/input/mod.rs b/python/core/src/io/input/mod.rs index 7a4be57dc..aea4b3f98 100644 --- a/python/core/src/io/input/mod.rs +++ b/python/core/src/io/input/mod.rs @@ -3,17 +3,23 @@ pub mod sync; use std::sync::Arc; use crate::error::PyGeoArrowResult; +#[cfg(feature = "async")] use crate::io::object_store::PyObjectStore; +#[cfg(feature = "async")] use object_store::http::HttpBuilder; +#[cfg(feature = "async")] use object_store::path::Path; +#[cfg(feature = "async")] use object_store::{ClientOptions, ObjectStore}; use pyo3::exceptions::PyValueError; use sync::FileReader; use pyo3::prelude::*; +#[cfg(feature = "async")] use tokio::runtime::Runtime; use url::Url; +#[cfg(feature = "async")] pub struct AsyncFileReader { pub store: Arc, pub path: Path, @@ -22,6 +28,7 @@ pub struct AsyncFileReader { pub enum AnyFileReader { Sync(FileReader), + #[cfg(feature = "async")] Async(AsyncFileReader), } @@ -32,46 +39,49 @@ pub enum AnyFileReader { pub fn construct_reader( py: Python, file: PyObject, - fs: Option, + fs: Option, ) -> PyGeoArrowResult { // If the user passed an object store instance, use that + #[cfg(feature = "async")] if let Some(fs) = fs { + let fs = fs.extract::(py)?; let path = file.extract::(py)?; let async_reader = AsyncFileReader { store: fs.inner, runtime: fs.rt, path: path.into(), }; - Ok(AnyFileReader::Async(async_reader)) - } else { - // If the user's path is a "known" URL (i.e. http(s)) then construct an object store - // instance for them. - if let Ok(path_or_url) = file.extract::(py) { - if path_or_url.starts_with("http") { - let url = Url::parse(&path_or_url)?; - // Expecting that the url input is something like - let store_input = format!("{}://{}", url.scheme(), url.domain().unwrap()); + return Ok(AnyFileReader::Async(async_reader)); + } - let options = ClientOptions::new().with_allow_http(true); - let store = HttpBuilder::new() - .with_url(store_input) - .with_client_options(options) - .build()?; - let path = url.path().trim_start_matches('/'); + // If the user's path is a "known" URL (i.e. http(s)) then construct an object store + // instance for them. + #[cfg(feature = "async")] + if let Ok(path_or_url) = file.extract::(py) { + if path_or_url.starts_with("http") { + let url = Url::parse(&path_or_url)?; + // Expecting that the url input is something like + let store_input = format!("{}://{}", url.scheme(), url.domain().unwrap()); - let runtime = Arc::new( - tokio::runtime::Runtime::new() - .map_err(|err| PyValueError::new_err(err.to_string()))?, - ); - let async_reader = AsyncFileReader { - store: Arc::new(store), - runtime, - path: path.into(), - }; - return Ok(AnyFileReader::Async(async_reader)); - } - } + let options = ClientOptions::new().with_allow_http(true); + let store = HttpBuilder::new() + .with_url(store_input) + .with_client_options(options) + .build()?; + let path = url.path().trim_start_matches('/'); - Ok(AnyFileReader::Sync(file.extract(py)?)) + let runtime = Arc::new( + tokio::runtime::Runtime::new() + .map_err(|err| PyValueError::new_err(err.to_string()))?, + ); + let async_reader = AsyncFileReader { + store: Arc::new(store), + runtime, + path: path.into(), + }; + return Ok(AnyFileReader::Async(async_reader)); + } } + + Ok(AnyFileReader::Sync(file.extract(py)?)) } diff --git a/python/core/src/io/mod.rs b/python/core/src/io/mod.rs index d6cb6cae9..31cdfd12b 100644 --- a/python/core/src/io/mod.rs +++ b/python/core/src/io/mod.rs @@ -6,8 +6,10 @@ pub mod flatgeobuf; pub mod geojson; pub mod geojson_lines; pub mod input; +#[cfg(feature = "async")] pub mod object_store; pub mod parquet; +#[cfg(feature = "async")] pub mod postgis; pub mod wkb; pub mod wkt; diff --git a/python/core/src/io/parquet/reader.rs b/python/core/src/io/parquet/async.rs similarity index 86% rename from python/core/src/io/parquet/reader.rs rename to python/core/src/io/parquet/async.rs index fbaf9bab0..3a5425ee8 100644 --- a/python/core/src/io/parquet/reader.rs +++ b/python/core/src/io/parquet/async.rs @@ -1,12 +1,10 @@ use std::collections::HashMap; -use std::fs::File; use std::sync::Arc; use crate::array::PyGeometryArray; use crate::crs::CRS; use crate::error::{PyGeoArrowError, PyGeoArrowResult}; use crate::interop::util::table_to_pytable; -use crate::io::input::sync::FileReader; use crate::io::input::{construct_reader, AnyFileReader}; use crate::io::object_store::PyObjectStore; use crate::io::parquet::options::create_options; @@ -17,89 +15,24 @@ use geoarrow::geo_traits::{CoordTrait, RectTrait}; use geoarrow::io::parquet::metadata::GeoParquetBboxCovering; use geoarrow::io::parquet::{ GeoParquetDatasetMetadata, GeoParquetReaderMetadata, GeoParquetReaderOptions, - GeoParquetRecordBatchReaderBuilder, GeoParquetRecordBatchStream, - GeoParquetRecordBatchStreamBuilder, + GeoParquetRecordBatchStream, GeoParquetRecordBatchStreamBuilder, }; use geoarrow::table::Table; use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::ParquetObjectReader; -use pyo3::exceptions::{PyFileNotFoundError, PyValueError}; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3_arrow::PySchema; use pythonize::depythonize_bound; use tokio::runtime::Runtime; -#[pyfunction] -#[pyo3(signature = (path, *, fs=None, batch_size=None))] -pub fn read_parquet( - py: Python, - path: PyObject, - fs: Option, - batch_size: Option, -) -> PyGeoArrowResult { - let reader = construct_reader(py, path, fs)?; - match reader { - AnyFileReader::Async(async_reader) => { - let table = async_reader.runtime.block_on(async move { - let object_meta = async_reader - .store - .head(&async_reader.path) - .await - .map_err(PyGeoArrowError::ObjectStoreError)?; - let reader = ParquetObjectReader::new(async_reader.store, object_meta); - - let mut geo_options = GeoParquetReaderOptions::default(); - - if let Some(batch_size) = batch_size { - geo_options = geo_options.with_batch_size(batch_size); - } - - let table = GeoParquetRecordBatchStreamBuilder::try_new_with_options( - reader, - ArrowReaderOptions::new().with_page_index(true), - geo_options, - ) - .await? - .build()? - .read_table() - .await?; - - Ok::<_, PyGeoArrowError>(table_to_pytable(table).to_arro3(py)?) - })?; - Ok(table) - } - AnyFileReader::Sync(sync_reader) => match sync_reader { - FileReader::File(path, _) => { - let file = File::open(path) - .map_err(|err| PyFileNotFoundError::new_err(err.to_string()))?; - - let mut geo_options = GeoParquetReaderOptions::default(); - - if let Some(batch_size) = batch_size { - geo_options = geo_options.with_batch_size(batch_size); - } - - let table = GeoParquetRecordBatchReaderBuilder::try_new_with_options( - file, - ArrowReaderOptions::new().with_page_index(true), - geo_options, - )? - .build()? - .read_table()?; - Ok(table_to_pytable(table).to_arro3(py)?) - } - _ => Err(PyValueError::new_err("File objects not supported in Parquet reader.").into()), - }, - } -} - #[pyfunction] #[pyo3(signature = (path, *, fs=None, batch_size=None))] pub fn read_parquet_async( py: Python, path: PyObject, - fs: Option, + fs: Option, batch_size: Option, ) -> PyGeoArrowResult { let reader = construct_reader(py, path, fs)?; diff --git a/python/core/src/io/parquet/mod.rs b/python/core/src/io/parquet/mod.rs index 15fcbc1c7..16bbf045e 100644 --- a/python/core/src/io/parquet/mod.rs +++ b/python/core/src/io/parquet/mod.rs @@ -1,3 +1,9 @@ +#[cfg(feature = "async")] +mod r#async; +mod sync; + pub mod options; -pub mod reader; -pub mod writer; + +#[cfg(feature = "async")] +pub use r#async::{read_parquet_async, ParquetDataset, ParquetFile}; +pub use sync::{read_parquet, write_parquet, ParquetWriter}; diff --git a/python/core/src/io/parquet/writer.rs b/python/core/src/io/parquet/sync.rs similarity index 51% rename from python/core/src/io/parquet/writer.rs rename to python/core/src/io/parquet/sync.rs index 3edc9b2f7..2fd40208c 100644 --- a/python/core/src/io/parquet/writer.rs +++ b/python/core/src/io/parquet/sync.rs @@ -1,15 +1,93 @@ -use crate::error::PyGeoArrowResult; -use crate::io::input::sync::FileWriter; +use std::fs::File; + +use crate::error::{PyGeoArrowError, PyGeoArrowResult}; +use crate::interop::util::table_to_pytable; +use crate::io::input::sync::{FileReader, FileWriter}; +use crate::io::input::{construct_reader, AnyFileReader}; + +use geoarrow::io::parquet::{GeoParquetReaderOptions, GeoParquetRecordBatchReaderBuilder}; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use pyo3::exceptions::{PyFileNotFoundError, PyValueError}; +use pyo3::prelude::*; +use pyo3_arrow::PyRecordBatch; +use pyo3_arrow::PySchema; use geoarrow::io::parquet::{ write_geoparquet as _write_geoparquet, GeoParquetWriter as _GeoParquetWriter, GeoParquetWriterOptions, }; -use pyo3::exceptions::PyValueError; -use pyo3::prelude::*; use pyo3_arrow::input::AnyRecordBatch; -use pyo3_arrow::{PyRecordBatch, PySchema}; +#[pyfunction] +#[pyo3(signature = (path, *, fs=None, batch_size=None))] +pub fn read_parquet( + py: Python, + path: PyObject, + fs: Option, + batch_size: Option, +) -> PyGeoArrowResult { + let reader = construct_reader(py, path, fs)?; + match reader { + #[cfg(feature = "async")] + AnyFileReader::Async(async_reader) => { + use geoarrow::io::parquet::GeoParquetRecordBatchStreamBuilder; + use object_store::ObjectStore; + use parquet::arrow::async_reader::ParquetObjectReader; + + let table = async_reader.runtime.block_on(async move { + let object_meta = async_reader + .store + .head(&async_reader.path) + .await + .map_err(PyGeoArrowError::ObjectStoreError)?; + let reader = ParquetObjectReader::new(async_reader.store, object_meta); + + let mut geo_options = GeoParquetReaderOptions::default(); + + if let Some(batch_size) = batch_size { + geo_options = geo_options.with_batch_size(batch_size); + } + + let table = GeoParquetRecordBatchStreamBuilder::try_new_with_options( + reader, + ArrowReaderOptions::new().with_page_index(true), + geo_options, + ) + .await? + .build()? + .read_table() + .await?; + + Ok::<_, PyGeoArrowError>(table_to_pytable(table).to_arro3(py)?) + })?; + Ok(table) + } + AnyFileReader::Sync(sync_reader) => match sync_reader { + FileReader::File(path, _) => { + let file = File::open(path) + .map_err(|err| PyFileNotFoundError::new_err(err.to_string()))?; + + let mut geo_options = GeoParquetReaderOptions::default(); + + if let Some(batch_size) = batch_size { + geo_options = geo_options.with_batch_size(batch_size); + } + + let table = GeoParquetRecordBatchReaderBuilder::try_new_with_options( + file, + ArrowReaderOptions::new().with_page_index(true), + geo_options, + )? + .build()? + .read_table()?; + Ok(table_to_pytable(table).to_arro3(py)?) + } + _ => Err(PyValueError::new_err("File objects not supported in Parquet reader.").into()), + }, + } +} + +#[allow(clippy::upper_case_acronyms)] pub enum GeoParquetEncoding { WKB, Native, diff --git a/python/core/src/lib.rs b/python/core/src/lib.rs index 86e9b3879..3d1fa87b9 100644 --- a/python/core/src/lib.rs +++ b/python/core/src/lib.rs @@ -96,6 +96,8 @@ fn _rust(_py: Python, m: &Bound) -> PyResult<()> { crate::algorithm::native::total_bounds::total_bounds, m )?)?; + + #[cfg(feature = "libc")] m.add_function(wrap_pyfunction!(crate::algorithm::polylabel::polylabel, m)?)?; // Top-level table functions @@ -106,37 +108,37 @@ fn _rust(_py: Python, m: &Bound) -> PyResult<()> { )?)?; m.add_function(wrap_pyfunction!(crate::table::geometry_col, m)?)?; - // IO + // Async IO + + #[cfg(feature = "async")] + { + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + + m.add_function(wrap_pyfunction!( + crate::io::flatgeobuf::read_flatgeobuf_async, + m + )?)?; + m.add_function(wrap_pyfunction!(crate::io::parquet::read_parquet_async, m)?)?; + + m.add_function(wrap_pyfunction!(crate::io::postgis::read_postgis, m)?)?; + m.add_function(wrap_pyfunction!(crate::io::postgis::read_postgis_async, m)?)?; + } - m.add_class::()?; - m.add_class::()?; - m.add_class::()?; + // IO m.add_function(wrap_pyfunction!(crate::io::csv::read_csv, m)?)?; m.add_function(wrap_pyfunction!(crate::io::flatgeobuf::read_flatgeobuf, m)?)?; - m.add_function(wrap_pyfunction!( - crate::io::flatgeobuf::read_flatgeobuf_async, - m - )?)?; m.add_function(wrap_pyfunction!(crate::io::geojson::read_geojson, m)?)?; m.add_function(wrap_pyfunction!( crate::io::geojson_lines::read_geojson_lines, m )?)?; - m.add_function(wrap_pyfunction!( - crate::io::parquet::reader::read_parquet, - m - )?)?; - m.add_function(wrap_pyfunction!( - crate::io::parquet::reader::read_parquet_async, - m - )?)?; - m.add_function(wrap_pyfunction!( - crate::io::parquet::writer::write_parquet, - m - )?)?; - m.add_function(wrap_pyfunction!(crate::io::postgis::read_postgis, m)?)?; - m.add_function(wrap_pyfunction!(crate::io::postgis::read_postgis_async, m)?)?; + m.add_function(wrap_pyfunction!(crate::io::parquet::read_parquet, m)?)?; + m.add_function(wrap_pyfunction!(crate::io::parquet::write_parquet, m)?)?; + m.add_class::()?; + m.add_function(wrap_pyfunction!( crate::interop::pyogrio::from_pyogrio::read_pyogrio, m diff --git a/src/chunked_array.rs b/src/chunked_array.rs index d722975c1..f536e1b9b 100644 --- a/src/chunked_array.rs +++ b/src/chunked_array.rs @@ -12,7 +12,7 @@ use std::collections::HashSet; use std::sync::Arc; use arrow::array::OffsetSizeTrait; -use arrow_array::Array; +use arrow_array::{make_array, Array, ArrayRef}; use arrow_schema::{DataType, Field}; #[cfg(feature = "rayon")] @@ -168,6 +168,14 @@ impl ChunkedArray { self.chunks.as_slice() } + /// Returns a Vec of dynamically-typed [ArrayRef]. + pub fn chunk_refs(&self) -> Vec { + self.chunks + .iter() + .map(|arr| make_array(arr.to_data())) + .collect() + } + /// Applies an operation over each chunk of this chunked array. /// /// If the `rayon` feature is enabled, this will be done in parallel.