Skip to content

Commit

Permalink
Update conversion to arro3
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Feb 21, 2025
1 parent cd03c76 commit 6b2d076
Showing 1 changed file with 32 additions and 4 deletions.
36 changes: 32 additions & 4 deletions obstore/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::AddAssign;
use std::sync::Arc;

use arrow::array::{
ArrayRef, RecordBatch, StringBuilder, TimestampMicrosecondBuilder, UInt64Builder,
ArrayRef, RecordBatch, StringBuilder, StructArray, TimestampMicrosecondBuilder, UInt64Builder,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use futures::stream::{BoxStream, Fuse};
Expand All @@ -12,8 +12,9 @@ use object_store::path::Path;
use object_store::{ListResult, ObjectMeta, ObjectStore};
use pyo3::exceptions::{PyImportError, PyStopAsyncIteration, PyStopIteration};
use pyo3::prelude::*;
use pyo3::types::PyDict;
use pyo3::types::{PyDict, PyTuple};
use pyo3::{intern, IntoPyObjectExt};
use pyo3_arrow::ffi::{to_array_pycapsules, to_stream_pycapsule, ArrayIterator};
use pyo3_arrow::{PyRecordBatch, PyTable};
use pyo3_object_store::{get_runtime, PyObjectStore, PyObjectStoreError, PyObjectStoreResult};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -231,8 +232,16 @@ impl<'py> IntoPyObject<'py> for PyRecordBatchWrapper {
type Output = Bound<'py, PyAny>;
type Error = PyErr;

// TODO: use `PyRecordBatch::into_arro3`
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
Ok(self.0.to_arro3(py)?.bind(py).clone())
let arro3_mod = py.import(intern!(py, "arro3.core"))?;
let record_batch = self.0.into_inner();
let field = Field::new_struct("", record_batch.schema_ref().fields().clone(), false);
let array: ArrayRef = Arc::new(StructArray::from(record_batch.clone()));
let array_pycapsules = to_array_pycapsules(py, field.into(), &array, None)?;
arro3_mod
.getattr(intern!(py, "RecordBatch"))?
.call_method1(intern!(py, "from_arrow_pycapsule"), array_pycapsules)
}
}

Expand All @@ -249,8 +258,27 @@ impl<'py> IntoPyObject<'py> for PyTableWrapper {
type Output = Bound<'py, PyAny>;
type Error = PyErr;

// TODO: use `PyRecordBatch::into_arro3`
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
Ok(self.0.to_arro3(py)?.bind(py).clone())
let arro3_mod = py.import(intern!(py, "arro3.core"))?;
let (batches, schema) = self.0.into_inner();

let field = schema.fields();
let array_reader = batches.into_iter().map(|batch| {
let arr: ArrayRef = Arc::new(StructArray::from(batch));
Ok(arr)
});
let array_reader = Box::new(ArrayIterator::new(
array_reader,
Field::new_struct("", field.clone(), false)
.with_metadata(schema.metadata.clone())
.into(),
));
let capsule = to_stream_pycapsule(py, array_reader, None)?;
arro3_mod.getattr(intern!(py, "Table"))?.call_method1(
intern!(py, "from_arrow_pycapsule"),
PyTuple::new(py, vec![capsule])?,
)
}
}

Expand Down

0 comments on commit 6b2d076

Please sign in to comment.