Skip to content

Commit

Permalink
feat(common): introduce SerialArrayBuilder and SerialArray (#8357)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Mar 7, 2023
1 parent 0a6638c commit bf83e28
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 25 deletions.
6 changes: 6 additions & 0 deletions dashboard/proto/gen/data.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ enum ArrayType {
LIST = 14;
BYTEA = 15;
JSONB = 16;
SERIAL = 17;
}

message Array {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/array/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ impl From<&ListArray> for arrow_array::ListArray {
|b, v| b.append_option(v.map(|d| d.into_arrow())),
),
ArrayImpl::Jsonb(_) => todo!("list of jsonb"),
ArrayImpl::Serial(_) => todo!("list of serial"),
ArrayImpl::Struct(_) => todo!("list of struct"),
ArrayImpl::List(_) => todo!("list of list"),
ArrayImpl::Bytea(a) => build(
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub use utf8_array::*;
pub use vis::{Vis, VisRef};

pub use self::error::ArrayError;
use crate::array::serial_array::{Serial, SerialArray, SerialArrayBuilder};
use crate::buffer::Bitmap;
use crate::types::*;
use crate::util::iter_util::ZipEqFast;
Expand Down Expand Up @@ -347,6 +348,7 @@ macro_rules! for_all_variants {
{ NaiveDateTime, naivedatetime, NaiveDateTimeArray, NaiveDateTimeArrayBuilder },
{ NaiveTime, naivetime, NaiveTimeArray, NaiveTimeArrayBuilder },
{ Jsonb, jsonb, JsonbArray, JsonbArrayBuilder },
{ Serial, serial, SerialArray, SerialArrayBuilder },
{ Struct, struct, StructArray, StructArrayBuilder },
{ List, list, ListArray, ListArrayBuilder },
{ Bytea, bytea, BytesArray, BytesArrayBuilder}
Expand Down Expand Up @@ -667,6 +669,9 @@ impl ArrayImpl {
ProstArrayType::Int16 => read_numeric_array::<i16, I16ValueReader>(array, cardinality)?,
ProstArrayType::Int32 => read_numeric_array::<i32, I32ValueReader>(array, cardinality)?,
ProstArrayType::Int64 => read_numeric_array::<i64, I64ValueReader>(array, cardinality)?,
ProstArrayType::Serial => {
read_numeric_array::<Serial, SerialValueReader>(array, cardinality)?
}
ProstArrayType::Float32 => {
read_numeric_array::<OrderedF32, F32ValueReader>(array, cardinality)?
}
Expand Down
1 change: 1 addition & 0 deletions src/common/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_pb::common::Buffer;
use risingwave_pb::data::{Array as ProstArray, ArrayType};

use super::{Array, ArrayBuilder, ArrayResult};
use crate::array::serial_array::Serial;
use crate::array::{ArrayBuilderImpl, ArrayImpl, ArrayMeta};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::for_all_native_types;
Expand Down
36 changes: 12 additions & 24 deletions src/common/src/array/serial_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@ use std::hash::Hash;
use postgres_types::{ToSql as _, Type};
use serde::{Serialize, Serializer};

use crate::types::{Scalar, ScalarRef};
use crate::array::{PrimitiveArray, PrimitiveArrayBuilder};

#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
// Serial is an alias for i64
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)]
pub struct Serial(i64);

pub type SerialArray = PrimitiveArray<Serial>;
pub type SerialArrayBuilder = PrimitiveArrayBuilder<Serial>;

impl From<i64> for Serial {
fn from(value: i64) -> Self {
Self(value)
}
}

impl Serial {
#[inline]
pub fn into_inner(self) -> i64 {
Expand Down Expand Up @@ -62,25 +72,3 @@ impl crate::types::to_binary::ToBinary for Serial {
Ok(Some(output.freeze()))
}
}

/// Implement `Scalar` for `Serial`.
impl Scalar for Serial {
type ScalarRefType<'a> = Serial;

fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
Serial(self.0)
}
}

/// Implement `ScalarRef` for `Serial`.
impl<'a> ScalarRef<'a> for Serial {
type ScalarType = Serial;

fn to_owned_scalar(&self) -> Serial {
*self
}

fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.hash(state)
}
}
5 changes: 4 additions & 1 deletion src/common/src/array/value_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use byteorder::{BigEndian, ReadBytesExt};

use super::ArrayResult;
use crate::array::{
ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Utf8ArrayBuilder,
ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Serial,
Utf8ArrayBuilder,
};
use crate::types::{Decimal, OrderedF32, OrderedF64};

Expand All @@ -31,6 +32,7 @@ pub trait PrimitiveValueReader<T: PrimitiveArrayItemType> {
pub struct I16ValueReader;
pub struct I32ValueReader;
pub struct I64ValueReader;
pub struct SerialValueReader;
pub struct F32ValueReader;
pub struct F64ValueReader;

Expand All @@ -50,6 +52,7 @@ macro_rules! impl_numeric_value_reader {
impl_numeric_value_reader!(i16, I16ValueReader, read_i16);
impl_numeric_value_reader!(i32, I32ValueReader, read_i32);
impl_numeric_value_reader!(i64, I64ValueReader, read_i64);
impl_numeric_value_reader!(Serial, SerialValueReader, read_i64);
impl_numeric_value_reader!(OrderedF32, F32ValueReader, read_f32);
impl_numeric_value_reader!(OrderedF64, F64ValueReader, read_f64);

Expand Down
13 changes: 13 additions & 0 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::io::{Cursor, Read};
use chrono::{Datelike, Timelike};
use fixedbitset::FixedBitSet;

use crate::array::serial_array::Serial;
use crate::array::{
Array, ArrayBuilder, ArrayBuilderImpl, ArrayError, ArrayImpl, ArrayResult, DataChunk, JsonbRef,
ListRef, StructRef,
Expand Down Expand Up @@ -477,6 +478,18 @@ impl<'a> HashKeySerDe<'a> for JsonbRef<'a> {
}
}

impl<'a> HashKeySerDe<'a> for Serial {
type S = <i64 as HashKeySerDe<'a>>::S;

fn serialize(self) -> Self::S {
self.into_inner().serialize()
}

fn deserialize<R: Read>(source: &mut R) -> Self {
i64::deserialize(source).into()
}
}

impl<'a> HashKeySerDe<'a> for StructRef<'a> {
type S = Vec<u8>;

Expand Down
9 changes: 9 additions & 0 deletions src/common/src/test_utils/rand_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use rand::prelude::Distribution;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};

use crate::array::serial_array::Serial;
use crate::array::{Array, ArrayBuilder, ArrayRef, JsonbVal, ListValue, StructValue};
use crate::types::{
Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, NativeType,
Expand Down Expand Up @@ -117,6 +118,13 @@ impl RandValue for bool {
}
}

impl RandValue for Serial {
fn rand_value<R: Rng>(rand: &mut R) -> Self {
// TODO(peng), serial should be in format of RowId
i64::rand_value(rand).into()
}
}

impl RandValue for JsonbVal {
fn rand_value<R: rand::Rng>(_rand: &mut R) -> Self {
JsonbVal::dummy()
Expand Down Expand Up @@ -177,6 +185,7 @@ where
mod tests {
use super::*;
use crate::array::interval_array::IntervalArray;
use crate::array::serial_array::SerialArray;
use crate::array::*;
use crate::for_all_variants;

Expand Down
1 change: 1 addition & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ macro_rules! for_all_native_types {
{ i16, Int16 },
{ i32, Int32 },
{ i64, Int64 },
{ Serial, Serial },
{ $crate::types::OrderedF32, Float32 },
{ $crate::types::OrderedF64, Float64 }
}
Expand Down
9 changes: 9 additions & 0 deletions src/common/src/types/native_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt::Debug;
use std::io::Write;

use super::{OrderedF32, OrderedF64};
use crate::array::serial_array::Serial;
use crate::array::ArrayResult;

pub trait NativeType:
Expand All @@ -42,6 +43,14 @@ impl NativeType for i64 {
}
}

impl NativeType for Serial {
fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
output
.write(&self.into_inner().to_be_bytes())
.map_err(Into::into)
}
}

impl NativeType for OrderedF32 {
fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
output.write(&self.to_be_bytes()).map_err(Into::into)
Expand Down

0 comments on commit bf83e28

Please sign in to comment.