Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common): introduce SerialArrayBuilder and SerialArray #8357

Merged
merged 8 commits into from
Mar 7, 2023
6 changes: 6 additions & 0 deletions dashboard/proto/gen/data.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

115 changes: 38 additions & 77 deletions dashboard/proto/gen/task_service.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ enum ArrayType {
LIST = 14;
BYTEA = 15;
JSONB = 16;
SERIAL = 17;
}

message Array {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/array/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ impl From<&ListArray> for arrow_array::ListArray {
|b, v| b.append_option(v.map(|d| d.into_arrow())),
),
ArrayImpl::Jsonb(_) => todo!("list of jsonb"),
ArrayImpl::Serial(_) => todo!("list of serial"),
ArrayImpl::Struct(_) => todo!("list of struct"),
ArrayImpl::List(_) => todo!("list of list"),
ArrayImpl::Bytea(a) => build(
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub use utf8_array::*;
pub use vis::{Vis, VisRef};

pub use self::error::ArrayError;
use crate::array::serial_array::{Serial, SerialArray, SerialArrayBuilder};
use crate::buffer::Bitmap;
use crate::types::*;
use crate::util::iter_util::ZipEqFast;
Expand Down Expand Up @@ -347,6 +348,7 @@ macro_rules! for_all_variants {
{ NaiveDateTime, naivedatetime, NaiveDateTimeArray, NaiveDateTimeArrayBuilder },
{ NaiveTime, naivetime, NaiveTimeArray, NaiveTimeArrayBuilder },
{ Jsonb, jsonb, JsonbArray, JsonbArrayBuilder },
{ Serial, serial, SerialArray, SerialArrayBuilder },
{ Struct, struct, StructArray, StructArrayBuilder },
{ List, list, ListArray, ListArrayBuilder },
{ Bytea, bytea, BytesArray, BytesArrayBuilder}
Expand Down Expand Up @@ -667,6 +669,9 @@ impl ArrayImpl {
ProstArrayType::Int16 => read_numeric_array::<i16, I16ValueReader>(array, cardinality)?,
ProstArrayType::Int32 => read_numeric_array::<i32, I32ValueReader>(array, cardinality)?,
ProstArrayType::Int64 => read_numeric_array::<i64, I64ValueReader>(array, cardinality)?,
ProstArrayType::Serial => {
read_numeric_array::<Serial, SerialValueReader>(array, cardinality)?
}
ProstArrayType::Float32 => {
read_numeric_array::<OrderedF32, F32ValueReader>(array, cardinality)?
}
Expand Down
1 change: 1 addition & 0 deletions src/common/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_pb::common::Buffer;
use risingwave_pb::data::{Array as ProstArray, ArrayType};

use super::{Array, ArrayBuilder, ArrayResult};
use crate::array::serial_array::Serial;
use crate::array::{ArrayBuilderImpl, ArrayImpl, ArrayMeta};
use crate::buffer::{Bitmap, BitmapBuilder};
use crate::for_all_native_types;
Expand Down
36 changes: 12 additions & 24 deletions src/common/src/array/serial_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@ use std::hash::Hash;
use postgres_types::{ToSql as _, Type};
use serde::{Serialize, Serializer};

use crate::types::{Scalar, ScalarRef};
use crate::array::{PrimitiveArray, PrimitiveArrayBuilder};

#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
// Serial is an alias for i64
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)]
pub struct Serial(i64);

pub type SerialArray = PrimitiveArray<Serial>;
pub type SerialArrayBuilder = PrimitiveArrayBuilder<Serial>;

impl From<i64> for Serial {
fn from(value: i64) -> Self {
Self(value)
}
}

impl Serial {
#[inline]
pub fn into_inner(self) -> i64 {
Expand Down Expand Up @@ -62,25 +72,3 @@ impl crate::types::to_binary::ToBinary for Serial {
Ok(Some(output.freeze()))
}
}

/// Implement `Scalar` for `Serial`.
impl Scalar for Serial {
type ScalarRefType<'a> = Serial;

fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> {
Serial(self.0)
}
}

/// Implement `ScalarRef` for `Serial`.
impl<'a> ScalarRef<'a> for Serial {
type ScalarType = Serial;

fn to_owned_scalar(&self) -> Serial {
*self
}

fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.hash(state)
}
}
5 changes: 4 additions & 1 deletion src/common/src/array/value_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use byteorder::{BigEndian, ReadBytesExt};

use super::ArrayResult;
use crate::array::{
ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Utf8ArrayBuilder,
ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Serial,
Utf8ArrayBuilder,
};
use crate::types::{Decimal, OrderedF32, OrderedF64};

Expand All @@ -31,6 +32,7 @@ pub trait PrimitiveValueReader<T: PrimitiveArrayItemType> {
pub struct I16ValueReader;
pub struct I32ValueReader;
pub struct I64ValueReader;
pub struct SerialValueReader;
pub struct F32ValueReader;
pub struct F64ValueReader;

Expand All @@ -50,6 +52,7 @@ macro_rules! impl_numeric_value_reader {
impl_numeric_value_reader!(i16, I16ValueReader, read_i16);
impl_numeric_value_reader!(i32, I32ValueReader, read_i32);
impl_numeric_value_reader!(i64, I64ValueReader, read_i64);
impl_numeric_value_reader!(Serial, SerialValueReader, read_i64);
impl_numeric_value_reader!(OrderedF32, F32ValueReader, read_f32);
impl_numeric_value_reader!(OrderedF64, F64ValueReader, read_f64);

Expand Down
13 changes: 13 additions & 0 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::io::{Cursor, Read};
use chrono::{Datelike, Timelike};
use fixedbitset::FixedBitSet;

use crate::array::serial_array::Serial;
use crate::array::{
Array, ArrayBuilder, ArrayBuilderImpl, ArrayError, ArrayImpl, ArrayResult, DataChunk, JsonbRef,
ListRef, StructRef,
Expand Down Expand Up @@ -477,6 +478,18 @@ impl<'a> HashKeySerDe<'a> for JsonbRef<'a> {
}
}

impl<'a> HashKeySerDe<'a> for Serial {
type S = <i64 as HashKeySerDe<'a>>::S;

fn serialize(self) -> Self::S {
self.into_inner().serialize()
}

fn deserialize<R: Read>(source: &mut R) -> Self {
i64::deserialize(source).into()
}
}

impl<'a> HashKeySerDe<'a> for StructRef<'a> {
type S = Vec<u8>;

Expand Down
Loading