Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement GetSize for array #8995

Merged
merged 4 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/common/src/array/bool_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_pb::data::{ArrayType, PbArray};
use super::{Array, ArrayBuilder, ArrayMeta};
use crate::array::ArrayBuilderImpl;
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BoolArray {
Expand Down Expand Up @@ -66,6 +67,12 @@ impl FromIterator<bool> for BoolArray {
}
}

impl EstimateSize for BoolArray {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.estimated_heap_size()
}
}

impl Array for BoolArray {
type Builder = BoolArrayBuilder;
type OwnedItem = bool;
Expand Down Expand Up @@ -194,6 +201,8 @@ mod tests {
})
.collect_vec();
let array = helper_test_builder(v.clone());
assert_eq!(256, array.estimated_heap_size());
assert_eq!(320, array.estimated_size());
let res = v.iter().zip_eq_fast(array.iter()).all(|(a, b)| *a == b);
assert!(res);
}
Expand Down
9 changes: 9 additions & 0 deletions src/common/src/array/bytes_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_pb::data::{ArrayType, PbArray};
use super::{Array, ArrayBuilder, ArrayMeta};
use crate::array::ArrayBuilderImpl;
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::util::iter_util::ZipEqDebug;

/// `BytesArray` is a collection of Rust `[u8]`s.
Expand All @@ -32,6 +33,14 @@ pub struct BytesArray {
data: Vec<u8>,
}

impl EstimateSize for BytesArray {
fn estimated_heap_size(&self) -> usize {
self.offset.capacity() * size_of::<u32>()
+ self.bitmap.estimated_heap_size()
+ self.data.capacity()
}
}

impl Array for BytesArray {
type Builder = BytesArrayBuilder;
type OwnedItem = Box<[u8]>;
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/array/jsonb_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::mem::size_of;

use postgres_types::{FromSql as _, ToSql as _, Type};
use serde_json::Value;

use super::{Array, ArrayBuilder};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::types::{Scalar, ScalarRef};
use crate::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -470,3 +473,10 @@ impl serde_json::ser::Formatter for ToTextFormatter {
writer.write_all(b": ")
}
}

// TODO: We need to fix this later.
impl EstimateSize for JsonbArray {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<Value>()
}
}
10 changes: 10 additions & 0 deletions src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use core::fmt;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::hash::Hash;
use std::mem::size_of;

use bytes::{Buf, BufMut};
use itertools::EitherOrBoth::{Both, Left, Right};
Expand All @@ -25,6 +26,7 @@ use serde::{Deserializer, Serializer};

use super::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, ArrayMeta, ArrayResult, RowRef};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::row::Row;
use crate::types::to_text::ToText;
use crate::types::{hash_datum, DataType, Datum, DatumRef, Scalar, ScalarRefImpl, ToDatumRef};
Expand Down Expand Up @@ -160,6 +162,14 @@ pub struct ListArray {
pub(super) value_type: DataType,
}

impl EstimateSize for ListArray {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size()
+ self.offsets.capacity() * size_of::<u32>()
+ self.value.estimated_heap_size()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Given value: Box<ArrayImpl>, it seems to me this should be self.value.estimated_size() to include Self?
  • DataType could be recursive and also takes some space on the heap. What's the impact of underestimation?

Copy link
Contributor Author

@liurenjie1024 liurenjie1024 Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I want to keep things simple for now since our goal is to count the most memory consume part, so leaving small things like Arc, Box, DataType is appropriate, and as name suggests, it's estimate size.

}
}

impl Array for ListArray {
type Builder = ListArrayBuilder;
type OwnedItem = ListValue;
Expand Down
21 changes: 19 additions & 2 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ pub use crate::array::num256_array::{
Int256Array, Int256ArrayBuilder, Uint256Array, Uint256ArrayBuilder,
};
use crate::buffer::Bitmap;
use crate::collection::estimate_size::EstimateSize;
use crate::types::*;
use crate::util::iter_util::ZipEqFast;
pub type ArrayResult<T> = std::result::Result<T, ArrayError>;
pub type ArrayResult<T> = Result<T, ArrayError>;

pub type I64Array = PrimitiveArray<i64>;
pub type I32Array = PrimitiveArray<i32>;
Expand Down Expand Up @@ -162,7 +163,9 @@ pub trait ArrayBuilder: Send + Sync + Sized + 'static {
/// In some cases, we will need to store owned data. For example, when aggregating min
/// and max, we need to store current maximum in the aggregator. In this case, we
/// could use `A::OwnedItem` in aggregator struct.
pub trait Array: std::fmt::Debug + Send + Sync + Sized + 'static + Into<ArrayImpl> {
pub trait Array:
std::fmt::Debug + Send + Sync + Sized + 'static + Into<ArrayImpl> + EstimateSize
{
/// A reference to item in array, as well as return type of `value_at`, which is
/// reciprocal to `Self::OwnedItem`.
type RefItem<'a>: ScalarRef<'a, ScalarType = Self::OwnedItem>
Expand Down Expand Up @@ -676,6 +679,20 @@ macro_rules! impl_array {

for_all_variants! { impl_array }

macro_rules! impl_array_estimate_size {
($({ $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
impl EstimateSize for ArrayImpl {
fn estimated_heap_size(&self) -> usize {
match self {
$( Self::$variant_name(inner) => inner.estimated_heap_size(), )*
}
}
}
}
}

for_all_variants! { impl_array_estimate_size }

impl ArrayImpl {
pub fn iter(&self) -> impl DoubleEndedIterator<Item = DatumRef<'_>> + ExactSizeIterator {
(0..self.len()).map(|i| self.value_at(i))
Expand Down
14 changes: 14 additions & 0 deletions src/common/src/array/num256_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::io::{Cursor, Read};
use std::mem::size_of;

use ethnum::{I256, U256};
use risingwave_pb::common::buffer::CompressionType;
Expand All @@ -21,6 +22,7 @@ use risingwave_pb::data::PbArray;

use crate::array::{Array, ArrayBuilder, ArrayImpl, ArrayResult};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::types::num256::{Int256, Int256Ref, Uint256, Uint256Ref};
use crate::types::Scalar;

Expand Down Expand Up @@ -204,3 +206,15 @@ impl_array_for_num256!(
Int256Ref<'a>,
Int256
);

impl EstimateSize for Uint256Array {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<U256>()
}
}

impl EstimateSize for Int256Array {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<I256>()
}
}
7 changes: 7 additions & 0 deletions src/common/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::{Array, ArrayBuilder, ArrayResult};
use crate::array::serial_array::Serial;
use crate::array::{ArrayBuilderImpl, ArrayImpl, ArrayMeta};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::for_all_native_types;
use crate::types::decimal::Decimal;
use crate::types::interval::Interval;
Expand Down Expand Up @@ -273,6 +274,12 @@ impl<T: PrimitiveArrayItemType> ArrayBuilder for PrimitiveArrayBuilder<T> {
}
}

impl<T: PrimitiveArrayItemType> EstimateSize for PrimitiveArray<T> {
fn estimated_heap_size(&self) -> usize {
self.bitmap.estimated_heap_size() + self.data.capacity() * size_of::<T>()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
72 changes: 49 additions & 23 deletions src/common/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_pb::data::{PbArray, PbArrayType, StructArrayData};
use super::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayImpl, ArrayMeta, ArrayResult};
use crate::array::ArrayRef;
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::collection::estimate_size::EstimateSize;
use crate::types::to_text::ToText;
use crate::types::{hash_datum, DataType, Datum, DatumRef, Scalar, ScalarRefImpl, ToDatumRef};
use crate::util::iter_util::ZipEqFast;
Expand Down Expand Up @@ -128,13 +129,13 @@ impl ArrayBuilder for StructArrayBuilder {
.into_iter()
.map(|b| Arc::new(b.finish()))
.collect::<Vec<ArrayRef>>();
StructArray {
bitmap: self.bitmap.finish(),
StructArray::new(
self.bitmap.finish(),
children,
children_type: self.children_type,
children_names: self.children_names,
len: self.len,
}
self.children_type,
self.children_names,
self.len,
)
}
}

Expand All @@ -145,6 +146,8 @@ pub struct StructArray {
children_type: Arc<[DataType]>,
children_names: Arc<[String]>,
len: usize,

heap_size: usize,
}

impl StructArrayBuilder {
Expand Down Expand Up @@ -219,6 +222,29 @@ impl Array for StructArray {
}

impl StructArray {
fn new(
bitmap: Bitmap,
children: Vec<ArrayRef>,
children_type: Arc<[DataType]>,
children_names: Arc<[String]>,
len: usize,
) -> Self {
let heap_size = bitmap.estimated_heap_size()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Underestimating children_type, children_names, Vec of Arc. Not sure how much this matters.

Copy link
Contributor Author

@liurenjie1024 liurenjie1024 Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

children_type, children_names may be shared across different data chunks, so it's inappropriate to count them. Vec of Arc is left for reason as above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This means children can also be shared because ArrayRef is Arc. Is this acceptable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This means children can also be shared because ArrayRef is Arc. Is this acceptable?

Yes, it's possible, so the caller, e.g. the executor who will count its memory occupation should be responsible with this. It's hard without allocator.

+ children
.iter()
.map(|c| c.estimated_heap_size())
.sum::<usize>();

Self {
bitmap,
children,
children_type,
children_names,
len,
heap_size,
}
}

pub fn from_protobuf(array: &PbArray) -> ArrayResult<ArrayImpl> {
ensure!(
array.values.is_empty(),
Expand All @@ -238,13 +264,7 @@ impl StructArray {
.map(DataType::from)
.collect::<Vec<DataType>>()
.into();
let arr = StructArray {
bitmap,
children,
children_type,
children_names: vec![].into(),
len: cardinality,
};
let arr = Self::new(bitmap, children, children_type, vec![].into(), cardinality);
Ok(arr.into())
}

Expand Down Expand Up @@ -273,13 +293,13 @@ impl StructArray {
let cardinality = null_bitmap.len();
let bitmap = Bitmap::from_iter(null_bitmap.to_vec());
let children = children.into_iter().map(Arc::new).collect_vec();
StructArray {
Self::new(
bitmap,
children_type: children_type.into(),
children_names: vec![].into(),
len: cardinality,
children,
}
children_type.into(),
vec![].into(),
cardinality,
)
}

pub fn from_slices_with_field_names(
Expand All @@ -291,13 +311,13 @@ impl StructArray {
let cardinality = null_bitmap.len();
let bitmap = Bitmap::from_iter(null_bitmap.to_vec());
let children = children.into_iter().map(Arc::new).collect_vec();
StructArray {
Self::new(
bitmap,
children_type: children_type.into(),
children_names: children_name.into(),
len: cardinality,
children,
}
children_type.into(),
children_name.into(),
cardinality,
)
}

#[cfg(test)]
Expand All @@ -310,6 +330,12 @@ impl StructArray {
}
}

impl EstimateSize for StructArray {
fn estimated_heap_size(&self) -> usize {
self.heap_size
}
}

#[derive(Clone, Debug, PartialEq, Eq, Default, Hash)]
pub struct StructValue {
fields: Box<[Datum]>,
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/array/utf8_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ use super::bytes_array::{BytesWriter, PartialBytesWriter};
use super::{Array, ArrayBuilder, ArrayMeta, BytesArray, BytesArrayBuilder};
use crate::array::ArrayBuilderImpl;
use crate::buffer::Bitmap;
use crate::collection::estimate_size::EstimateSize;

/// `Utf8Array` is a collection of Rust Utf8 `str`s. It's a wrapper of `BytesArray`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Utf8Array {
bytes: BytesArray,
}

impl EstimateSize for Utf8Array {
fn estimated_heap_size(&self) -> usize {
self.bytes.estimated_heap_size()
}
}

impl Array for Utf8Array {
type Builder = Utf8ArrayBuilder;
type OwnedItem = Box<str>;
Expand Down
Loading