From 288552dee5e71b7017dcd051483c7606a9e4a91f Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 8 Jan 2025 16:25:24 +0100 Subject: [PATCH] Port PendingRow to arrow-rs --- Cargo.lock | 7 +++-- Cargo.toml | 2 ++ crates/store/re_chunk/src/batcher.rs | 31 ++++++++++++----------- crates/store/re_chunk/src/chunk.rs | 17 +++++++++++++ crates/top/re_sdk/src/recording_stream.rs | 2 +- crates/top/rerun_c/src/lib.rs | 2 +- rerun_py/src/arrow.rs | 2 +- 7 files changed, 41 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 929018b52039..71ba376ac16e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5183,7 +5183,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck 0.5.0", + "heck 0.4.1", "itertools 0.13.0", "log", "multimap", @@ -5542,9 +5542,8 @@ dependencies = [ [[package]] name = "re_arrow2" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f046c5679b0f305d610f80d93fd51ad702cfc077bbe16d9553a1660a2505160" +version = "0.18.1" +source = "git+https://github.com/rerun-io/re_arrow2.git?branch=emilk/more-arrow-compatibility#0e4b3dd7cd73426b1209ebe0323087452a7c8b91" dependencies = [ "ahash", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 6b8eee119203..35b70f963616 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -587,3 +587,5 @@ egui_commonmark = { git = "https://github.com/rerun-io/egui_commonmark.git", bra # walkers = { git = "https://github.com/rerun-io/walkers", rev = "8939cceb3fa49ca8648ee16fe1d8432f5ab0bdcc" } # https://github.com/podusowski/walkers/pull/222 # dav1d = { path = "/home/cmc/dev/rerun-io/rav1d", package = "re_rav1d", version = "0.1.1" } + +re_arrow2 = { git = "https://github.com/rerun-io/re_arrow2.git", branch = "emilk/more-arrow-compatibility" } # TODO : point to main branmch diff --git a/crates/store/re_chunk/src/batcher.rs b/crates/store/re_chunk/src/batcher.rs index 0ef2fadcddf4..814616eabad8 100644 --- a/crates/store/re_chunk/src/batcher.rs +++ b/crates/store/re_chunk/src/batcher.rs @@ -4,7 +4,8 @@ use std::{ time::{Duration, Instant}, }; -use arrow2::array::{Array as Arrow2Array, PrimitiveArray as Arrow2PrimitiveArray}; +use arrow::array::{Array as ArrowArray, ArrayRef}; +use arrow2::array::PrimitiveArray as Arrow2PrimitiveArray; use crossbeam::channel::{Receiver, Sender}; use nohash_hasher::IntMap; @@ -12,7 +13,7 @@ use re_byte_size::SizeBytes as _; use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline}; use re_types_core::ComponentDescriptor; -use crate::{arrow2_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; +use crate::{arrow_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn}; // --- @@ -679,15 +680,12 @@ pub struct PendingRow { /// The component data. /// /// Each array is a single component, i.e. _not_ a list array. - pub components: IntMap>, + pub components: IntMap, } impl PendingRow { #[inline] - pub fn new( - timepoint: TimePoint, - components: IntMap>, - ) -> Self { + pub fn new(timepoint: TimePoint, components: IntMap) -> Self { Self { row_id: RowId::new(), timepoint, @@ -734,9 +732,9 @@ impl PendingRow { let mut per_name = ChunkComponents::default(); for (component_desc, array) in components { - let list_array = arrow2_util::arrays_to_list_array_opt(&[Some(&*array as _)]); + let list_array = arrow_util::arrays_to_list_array_opt(&[Some(&*array as _)]); if let Some(list_array) = list_array { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor_arrow1(component_desc, list_array); } } @@ -826,7 +824,7 @@ impl PendingRow { // Create all the logical list arrays that we're going to need, accounting for the // possibility of sparse components in the data. - let mut all_components: IntMap>> = + let mut all_components: IntMap>> = IntMap::default(); for row in &rows { for component_desc in row.components.keys() { @@ -870,9 +868,12 @@ impl PendingRow { for (component_desc, arrays) in std::mem::take(&mut components) { let list_array = - arrow2_util::arrays_to_list_array_opt(&arrays); + arrow_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor_arrow1( + component_desc, + list_array, + ); } } per_name @@ -898,7 +899,7 @@ impl PendingRow { arrays.push( row_components .get(component_desc) - .map(|array| &**array as &dyn Arrow2Array), + .map(|array| &**array as &dyn ArrowArray), ); } } @@ -915,9 +916,9 @@ impl PendingRow { { let mut per_name = ChunkComponents::default(); for (component_desc, arrays) in components { - let list_array = arrow2_util::arrays_to_list_array_opt(&arrays); + let list_array = arrow_util::arrays_to_list_array_opt(&arrays); if let Some(list_array) = list_array { - per_name.insert_descriptor(component_desc, list_array); + per_name.insert_descriptor_arrow1(component_desc, list_array); } } per_name diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index ca2605d97dd4..11c8c6baad6c 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -1,6 +1,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use ahash::HashMap; +use arrow::array::ListArray as ArrowListArray; use arrow2::{ array::{ Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, @@ -60,6 +61,22 @@ pub struct ChunkComponents( ); impl ChunkComponents { + /// Like `Self::insert`, but automatically infers the [`ComponentName`] layer. + #[inline] + pub fn insert_descriptor_arrow1( + &mut self, + component_desc: ComponentDescriptor, + list_array: ArrowListArray, + ) -> Option { + // TODO(cmc): revert me + let component_desc = component_desc.untagged(); + self.0 + .entry(component_desc.component_name) + .or_default() + .insert(component_desc, list_array.into()) + .map(|la| la.into()) + } + /// Like `Self::insert`, but automatically infers the [`ComponentName`] layer. #[inline] pub fn insert_descriptor( diff --git a/crates/top/re_sdk/src/recording_stream.rs b/crates/top/re_sdk/src/recording_stream.rs index 7551dc9cca40..54956ff35f97 100644 --- a/crates/top/re_sdk/src/recording_stream.rs +++ b/crates/top/re_sdk/src/recording_stream.rs @@ -1176,7 +1176,7 @@ impl RecordingStream { .into_iter() .map(|comp_batch| { comp_batch - .to_arrow2() + .to_arrow() .map(|array| (comp_batch.descriptor().into_owned(), array)) }) .collect(); diff --git a/crates/top/rerun_c/src/lib.rs b/crates/top/rerun_c/src/lib.rs index d091d9322904..e90100d2a0b6 100644 --- a/crates/top/rerun_c/src/lib.rs +++ b/crates/top/rerun_c/src/lib.rs @@ -820,7 +820,7 @@ fn rr_recording_stream_log_impl( let component_type = component_type_registry.get(*component_type)?; let datatype = component_type.datatype.clone(); let values = unsafe { arrow_array_from_c_ffi(array, datatype) }?; - components.insert(component_type.descriptor.clone(), values); + components.insert(component_type.descriptor.clone(), values.into()); } } diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index f75e7a04ee30..04c6ea735e03 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -89,7 +89,7 @@ pub fn build_row_from_components( let component_descr = descriptor_to_rust(&component_descr)?; let (list_array, _field) = array_to_rust(&array, &component_descr)?; - components.insert(component_descr, list_array); + components.insert(component_descr, list_array.into()); } Ok(PendingRow {