From 76c32b955cc1f8f790252add0e78a02efd9a1583 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 6 Mar 2023 14:40:43 +0800 Subject: [PATCH 1/8] Add new SERIAL type & related changes. --- dashboard/proto/gen/data.ts | 6 ++ proto/data.proto | 1 + src/common/src/array/arrow.rs | 1 + src/common/src/array/column_proto_readers.rs | 18 ++++- src/common/src/array/mod.rs | 9 +++ src/common/src/array/serial_array.rs | 73 ++++++++++++++++++++ src/common/src/hash/key.rs | 14 ++++ 7 files changed, 120 insertions(+), 2 deletions(-) diff --git a/dashboard/proto/gen/data.ts b/dashboard/proto/gen/data.ts index f34bbab892582..1b865bc65a17a 100644 --- a/dashboard/proto/gen/data.ts +++ b/dashboard/proto/gen/data.ts @@ -21,6 +21,7 @@ export const RwArrayType = { LIST: "LIST", BYTEA: "BYTEA", JSONB: "JSONB", + SERIAL: "SERIAL", UNRECOGNIZED: "UNRECOGNIZED", } as const; @@ -79,6 +80,9 @@ export function rwArrayTypeFromJSON(object: any): RwArrayType { case 16: case "JSONB": return RwArrayType.JSONB; + case 17: + case "SERIAL": + return RwArrayType.SERIAL; case -1: case "UNRECOGNIZED": default: @@ -122,6 +126,8 @@ export function rwArrayTypeToJSON(object: RwArrayType): string { return "BYTEA"; case RwArrayType.JSONB: return "JSONB"; + case RwArrayType.SERIAL: + return "SERIAL"; case RwArrayType.UNRECOGNIZED: default: return "UNRECOGNIZED"; diff --git a/proto/data.proto b/proto/data.proto index 0db71cf446773..fba6ec6f4c1c0 100644 --- a/proto/data.proto +++ b/proto/data.proto @@ -97,6 +97,7 @@ enum ArrayType { LIST = 14; BYTEA = 15; JSONB = 16; + SERIAL = 17; } message Array { diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index d1bbff8f329e4..7fb068252b4f6 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -444,6 +444,7 @@ impl From<&ListArray> for arrow_array::ListArray { |b, v| b.append_option(v.map(|d| d.into_arrow())), ), ArrayImpl::Jsonb(_) => todo!("list of jsonb"), + ArrayImpl::Serial(_) => todo!("list of serial"), ArrayImpl::Struct(_) => todo!("list of struct"), ArrayImpl::List(_) => todo!("list of list"), ArrayImpl::Bytea(a) => build( diff --git a/src/common/src/array/column_proto_readers.rs b/src/common/src/array/column_proto_readers.rs index f1e85f2e40d22..dfa0b40e2c604 100644 --- a/src/common/src/array/column_proto_readers.rs +++ b/src/common/src/array/column_proto_readers.rs @@ -23,8 +23,9 @@ use crate::array::value_reader::{PrimitiveValueReader, VarSizedValueReader}; use crate::array::{ Array, ArrayBuilder, ArrayImpl, ArrayMeta, ArrayResult, BoolArray, IntervalArrayBuilder, NaiveDateArrayBuilder, NaiveDateTimeArrayBuilder, NaiveTimeArrayBuilder, PrimitiveArrayBuilder, - PrimitiveArrayItemType, + PrimitiveArrayItemType,SerialArrayBuilder, }; +use crate::array::serial_array::Serial; use crate::buffer::Bitmap; use crate::types::interval::IntervalUnit; use crate::types::{NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper}; @@ -97,6 +98,18 @@ fn read_naive_date_time(cursor: &mut Cursor<&[u8]>) -> ArrayResult) -> ArrayResult { + todo!() + // cursor + // .read_i64::() + // .map_err(|e| anyhow!("Failed to read i64 from NaiveDateTime buffer: {}", e)) + // .and_then(|t| NaiveDateTimeWrapper::with_macros(t).map_err(|e| anyhow!("{}", e))) + // .map_err(Into::into) +} + + + pub fn read_interval_unit(cursor: &mut Cursor<&[u8]>) -> ArrayResult { let mut read = || { let months = cursor.read_i32::()?; @@ -148,7 +161,8 @@ read_one_value_array! { { IntervalUnit, IntervalArrayBuilder }, { NaiveDate, NaiveDateArrayBuilder }, { NaiveTime, NaiveTimeArrayBuilder }, - { NaiveDateTime, NaiveDateTimeArrayBuilder } + { NaiveDateTime, NaiveDateTimeArrayBuilder }, + { Serial, SerialArrayBuilder } } fn read_offset(offset_cursor: &mut Cursor<&[u8]>) -> ArrayResult { diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index c125b77ff8a31..dc76aeb7de9ed 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -65,6 +65,7 @@ pub use utf8_array::*; pub use vis::{Vis, VisRef}; pub use self::error::ArrayError; +use crate::array::serial_array::{SerialArray, SerialArrayBuilder}; use crate::buffer::Bitmap; use crate::types::*; use crate::util::iter_util::ZipEqFast; @@ -347,6 +348,7 @@ macro_rules! for_all_variants { { NaiveDateTime, naivedatetime, NaiveDateTimeArray, NaiveDateTimeArrayBuilder }, { NaiveTime, naivetime, NaiveTimeArray, NaiveTimeArrayBuilder }, { Jsonb, jsonb, JsonbArray, JsonbArrayBuilder }, + { Serial, serial, SerialArray, SerialArrayBuilder }, { Struct, struct, StructArray, StructArrayBuilder }, { List, list, ListArray, ListArrayBuilder }, { Bytea, bytea, BytesArray, BytesArrayBuilder} @@ -391,6 +393,12 @@ impl From for ArrayImpl { } } +impl From for ArrayImpl { + fn from(arr: SerialArray) -> Self { + Self::Serial(arr) + } +} + impl From for ArrayImpl { fn from(arr: StructArray) -> Self { Self::Struct(arr) @@ -687,6 +695,7 @@ impl ArrayImpl { ProstArrayType::Jsonb => { read_string_array::(array, cardinality)? } + ProstArrayType::Serial => read_serial_array(array, cardinality)?, ProstArrayType::Struct => StructArray::from_protobuf(array)?, ProstArrayType::List => ListArray::from_protobuf(array)?, ProstArrayType::Unspecified => unreachable!(), diff --git a/src/common/src/array/serial_array.rs b/src/common/src/array/serial_array.rs index ae1b0f10c7773..69d2f571b26ae 100644 --- a/src/common/src/array/serial_array.rs +++ b/src/common/src/array/serial_array.rs @@ -17,11 +17,19 @@ use std::hash::Hash; use postgres_types::{ToSql as _, Type}; use serde::{Serialize, Serializer}; +use crate::array::{Array, ArrayBuilder, PrimitiveArray, PrimitiveArrayBuilder}; +use crate::buffer::Bitmap; use crate::types::{Scalar, ScalarRef}; #[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] pub struct Serial(i64); +impl From for Serial { + fn from(value: i64) -> Self { + Self(value) + } +} + impl Serial { #[inline] pub fn into_inner(self) -> i64 { @@ -84,3 +92,68 @@ impl<'a> ScalarRef<'a> for Serial { self.0.hash(state) } } + +#[derive(Debug)] +pub struct SerialArrayBuilder(PrimitiveArrayBuilder); + +#[derive(Debug, Clone)] +pub struct SerialArray(PrimitiveArray); + +impl ArrayBuilder for SerialArrayBuilder { + type ArrayType = SerialArray; + + fn with_meta(capacity: usize, _meta: super::ArrayMeta) -> Self { + Self(PrimitiveArrayBuilder::::with_meta(capacity, _meta)) + } + + fn append_n(&mut self, n: usize, value: Option<::RefItem<'_>>) { + self.0.append_n(n, value.map(|x| x.0)); + } + + fn append_array(&mut self, other: &Self::ArrayType) { + self.0.append_array(&other.0); + } + + fn pop(&mut self) -> Option<()> { + self.0.pop() + } + + fn finish(self) -> Self::ArrayType { + SerialArray(self.0.finish()) + } +} + +impl Array for SerialArray { + type Builder = SerialArrayBuilder; + type OwnedItem = Serial; + type RefItem<'a> = Serial; + + unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> { + Serial(self.0.raw_value_at_unchecked(idx)) + } + + fn len(&self) -> usize { + self.0.len() + } + + fn to_protobuf(&self) -> super::ProstArray { + self.0.to_protobuf() + } + + fn null_bitmap(&self) -> &Bitmap { + self.0.null_bitmap() + } + + fn into_null_bitmap(self) -> Bitmap { + self.0.into_null_bitmap() + } + + fn set_bitmap(&mut self, bitmap: Bitmap) { + self.0.set_bitmap(bitmap) + } + + fn create_builder(&self, capacity: usize) -> super::ArrayBuilderImpl { + let array_builder = Self::Builder::new(capacity); + super::ArrayBuilderImpl::Serial(array_builder) + } +} diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index 48901111f3288..db608d4044e8d 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -30,6 +30,7 @@ use std::io::{Cursor, Read}; use chrono::{Datelike, Timelike}; use fixedbitset::FixedBitSet; +use crate::array::serial_array::Serial; use crate::array::{ Array, ArrayBuilder, ArrayBuilderImpl, ArrayError, ArrayImpl, ArrayResult, DataChunk, JsonbRef, ListRef, StructRef, @@ -477,6 +478,19 @@ impl<'a> HashKeySerDe<'a> for JsonbRef<'a> { } } +impl<'a> HashKeySerDe<'a> for Serial { + type S = [u8; 8]; + + fn serialize(self) -> Self::S { + self.into_inner().to_ne_bytes() + } + + fn deserialize(source: &mut R) -> Self { + let value = Self::read_fixed_size_bytes::(source); + Self::from(i64::from_ne_bytes(value)) + } +} + impl<'a> HashKeySerDe<'a> for StructRef<'a> { type S = Vec; From 44aa64b16643d831d061b5c70f1d8019da3564de Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 6 Mar 2023 14:54:49 +0800 Subject: [PATCH 2/8] Import new module and use for `read_serial`; import `SerialArrayBuilder` from new location; cleanup code. --- src/common/src/array/column_proto_readers.rs | 21 ++++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/src/common/src/array/column_proto_readers.rs b/src/common/src/array/column_proto_readers.rs index dfa0b40e2c604..a2a22c34a6f19 100644 --- a/src/common/src/array/column_proto_readers.rs +++ b/src/common/src/array/column_proto_readers.rs @@ -19,13 +19,13 @@ use byteorder::{BigEndian, ReadBytesExt}; use paste::paste; use risingwave_pb::data::Array as ProstArray; -use crate::array::value_reader::{PrimitiveValueReader, VarSizedValueReader}; +use crate::array::serial_array::Serial; +use crate::array::value_reader::{I64ValueReader, PrimitiveValueReader, VarSizedValueReader}; use crate::array::{ Array, ArrayBuilder, ArrayImpl, ArrayMeta, ArrayResult, BoolArray, IntervalArrayBuilder, NaiveDateArrayBuilder, NaiveDateTimeArrayBuilder, NaiveTimeArrayBuilder, PrimitiveArrayBuilder, - PrimitiveArrayItemType,SerialArrayBuilder, + PrimitiveArrayItemType, SerialArrayBuilder, }; -use crate::array::serial_array::Serial; use crate::buffer::Bitmap; use crate::types::interval::IntervalUnit; use crate::types::{NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper}; @@ -98,18 +98,13 @@ fn read_naive_date_time(cursor: &mut Cursor<&[u8]>) -> ArrayResult) -> ArrayResult { - todo!() - // cursor - // .read_i64::() - // .map_err(|e| anyhow!("Failed to read i64 from NaiveDateTime buffer: {}", e)) - // .and_then(|t| NaiveDateTimeWrapper::with_macros(t).map_err(|e| anyhow!("{}", e))) - // .map_err(Into::into) +fn read_serial(cursor: &mut Cursor<&[u8]>) -> ArrayResult { + I64ValueReader::read(cursor) + .map_err(|e| anyhow!("Failed to read i64 from Serial buffer: {}", e)) + .map(|t| Serial::from(t)) + .map_err(Into::into) } - - pub fn read_interval_unit(cursor: &mut Cursor<&[u8]>) -> ArrayResult { let mut read = || { let months = cursor.read_i32::()?; From 39da4f85b5e392f202e03d52c8459cfe27459806 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 6 Mar 2023 14:55:06 +0800 Subject: [PATCH 3/8] Implement HashKeySerDe for JsonbRef and Serial types --- src/common/src/hash/key.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index db608d4044e8d..0856bda158daa 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -479,15 +479,14 @@ impl<'a> HashKeySerDe<'a> for JsonbRef<'a> { } impl<'a> HashKeySerDe<'a> for Serial { - type S = [u8; 8]; + type S = >::S; fn serialize(self) -> Self::S { - self.into_inner().to_ne_bytes() + self.into_inner().serialize() } fn deserialize(source: &mut R) -> Self { - let value = Self::read_fixed_size_bytes::(source); - Self::from(i64::from_ne_bytes(value)) + i64::deserialize(source).into() } } From 0eb8c162560a27fc8df06c6ef6bcd2a28dfbaa9d Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 6 Mar 2023 15:05:22 +0800 Subject: [PATCH 4/8] Simplify `read_serial` with `.map(Serial::from)` --- src/common/src/array/column_proto_readers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/array/column_proto_readers.rs b/src/common/src/array/column_proto_readers.rs index a2a22c34a6f19..b6f80c1e857e7 100644 --- a/src/common/src/array/column_proto_readers.rs +++ b/src/common/src/array/column_proto_readers.rs @@ -101,7 +101,7 @@ fn read_naive_date_time(cursor: &mut Cursor<&[u8]>) -> ArrayResult) -> ArrayResult { I64ValueReader::read(cursor) .map_err(|e| anyhow!("Failed to read i64 from Serial buffer: {}", e)) - .map(|t| Serial::from(t)) + .map(Serial::from) .map_err(Into::into) } From f37ef4c1f4bd57bfbe10116e5eee277d057bd896 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 6 Mar 2023 15:05:33 +0800 Subject: [PATCH 5/8] Implement `RandValue` trait, import `SerialArray` and use in `tests`. --- src/common/src/test_utils/rand_array.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs index 01365d85c325d..83b4b884d8d2d 100644 --- a/src/common/src/test_utils/rand_array.rs +++ b/src/common/src/test_utils/rand_array.rs @@ -25,6 +25,7 @@ use rand::prelude::Distribution; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; +use crate::array::serial_array::Serial; use crate::array::{Array, ArrayBuilder, ArrayRef, JsonbVal, ListValue, StructValue}; use crate::types::{ Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, NativeType, @@ -117,6 +118,12 @@ impl RandValue for bool { } } +impl RandValue for Serial { + fn rand_value(rand: &mut R) -> Self { + i64::rand_value(rand).into() + } +} + impl RandValue for JsonbVal { fn rand_value(_rand: &mut R) -> Self { JsonbVal::dummy() @@ -177,6 +184,7 @@ where mod tests { use super::*; use crate::array::interval_array::IntervalArray; + use crate::array::serial_array::SerialArray; use crate::array::*; use crate::for_all_variants; From 01212abff44e6ca5eb3b2b6ab8969b1f540e1e39 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 6 Mar 2023 15:18:00 +0800 Subject: [PATCH 6/8] Rebuild dashboard proto --- dashboard/proto/gen/task_service.ts | 115 +++++++++------------------- 1 file changed, 38 insertions(+), 77 deletions(-) diff --git a/dashboard/proto/gen/task_service.ts b/dashboard/proto/gen/task_service.ts index 8446c702d48b5..2cfcb1d49bf04 100644 --- a/dashboard/proto/gen/task_service.ts +++ b/dashboard/proto/gen/task_service.ts @@ -13,12 +13,14 @@ export interface TaskId { taskId: number; } -export interface TaskInfo { +export interface TaskInfoResponse { taskId: TaskId1 | undefined; - taskStatus: TaskInfo_TaskStatus; + taskStatus: TaskInfoResponse_TaskStatus; + /** Optional error message for failed task. */ + errorMessage: string; } -export const TaskInfo_TaskStatus = { +export const TaskInfoResponse_TaskStatus = { /** UNSPECIFIED - Note: Requirement of proto3: first enum must be 0. */ UNSPECIFIED: "UNSPECIFIED", PENDING: "PENDING", @@ -29,50 +31,50 @@ export const TaskInfo_TaskStatus = { UNRECOGNIZED: "UNRECOGNIZED", } as const; -export type TaskInfo_TaskStatus = typeof TaskInfo_TaskStatus[keyof typeof TaskInfo_TaskStatus]; +export type TaskInfoResponse_TaskStatus = typeof TaskInfoResponse_TaskStatus[keyof typeof TaskInfoResponse_TaskStatus]; -export function taskInfo_TaskStatusFromJSON(object: any): TaskInfo_TaskStatus { +export function taskInfoResponse_TaskStatusFromJSON(object: any): TaskInfoResponse_TaskStatus { switch (object) { case 0: case "UNSPECIFIED": - return TaskInfo_TaskStatus.UNSPECIFIED; + return TaskInfoResponse_TaskStatus.UNSPECIFIED; case 2: case "PENDING": - return TaskInfo_TaskStatus.PENDING; + return TaskInfoResponse_TaskStatus.PENDING; case 3: case "RUNNING": - return TaskInfo_TaskStatus.RUNNING; + return TaskInfoResponse_TaskStatus.RUNNING; case 6: case "FINISHED": - return TaskInfo_TaskStatus.FINISHED; + return TaskInfoResponse_TaskStatus.FINISHED; case 7: case "FAILED": - return TaskInfo_TaskStatus.FAILED; + return TaskInfoResponse_TaskStatus.FAILED; case 8: case "ABORTED": - return TaskInfo_TaskStatus.ABORTED; + return TaskInfoResponse_TaskStatus.ABORTED; case -1: case "UNRECOGNIZED": default: - return TaskInfo_TaskStatus.UNRECOGNIZED; + return TaskInfoResponse_TaskStatus.UNRECOGNIZED; } } -export function taskInfo_TaskStatusToJSON(object: TaskInfo_TaskStatus): string { +export function taskInfoResponse_TaskStatusToJSON(object: TaskInfoResponse_TaskStatus): string { switch (object) { - case TaskInfo_TaskStatus.UNSPECIFIED: + case TaskInfoResponse_TaskStatus.UNSPECIFIED: return "UNSPECIFIED"; - case TaskInfo_TaskStatus.PENDING: + case TaskInfoResponse_TaskStatus.PENDING: return "PENDING"; - case TaskInfo_TaskStatus.RUNNING: + case TaskInfoResponse_TaskStatus.RUNNING: return "RUNNING"; - case TaskInfo_TaskStatus.FINISHED: + case TaskInfoResponse_TaskStatus.FINISHED: return "FINISHED"; - case TaskInfo_TaskStatus.FAILED: + case TaskInfoResponse_TaskStatus.FAILED: return "FAILED"; - case TaskInfo_TaskStatus.ABORTED: + case TaskInfoResponse_TaskStatus.ABORTED: return "ABORTED"; - case TaskInfo_TaskStatus.UNRECOGNIZED: + case TaskInfoResponse_TaskStatus.UNRECOGNIZED: default: return "UNRECOGNIZED"; } @@ -96,13 +98,7 @@ export interface GetTaskInfoRequest { taskId: TaskId1 | undefined; } -export interface TaskInfoResponse { - status: Status | undefined; - taskInfo: TaskInfo | undefined; -} - export interface GetDataResponse { - status: Status | undefined; recordBatch: DataChunk | undefined; } @@ -174,33 +170,36 @@ export const TaskId = { }, }; -function createBaseTaskInfo(): TaskInfo { - return { taskId: undefined, taskStatus: TaskInfo_TaskStatus.UNSPECIFIED }; +function createBaseTaskInfoResponse(): TaskInfoResponse { + return { taskId: undefined, taskStatus: TaskInfoResponse_TaskStatus.UNSPECIFIED, errorMessage: "" }; } -export const TaskInfo = { - fromJSON(object: any): TaskInfo { +export const TaskInfoResponse = { + fromJSON(object: any): TaskInfoResponse { return { taskId: isSet(object.taskId) ? TaskId1.fromJSON(object.taskId) : undefined, taskStatus: isSet(object.taskStatus) - ? taskInfo_TaskStatusFromJSON(object.taskStatus) - : TaskInfo_TaskStatus.UNSPECIFIED, + ? taskInfoResponse_TaskStatusFromJSON(object.taskStatus) + : TaskInfoResponse_TaskStatus.UNSPECIFIED, + errorMessage: isSet(object.errorMessage) ? String(object.errorMessage) : "", }; }, - toJSON(message: TaskInfo): unknown { + toJSON(message: TaskInfoResponse): unknown { const obj: any = {}; message.taskId !== undefined && (obj.taskId = message.taskId ? TaskId1.toJSON(message.taskId) : undefined); - message.taskStatus !== undefined && (obj.taskStatus = taskInfo_TaskStatusToJSON(message.taskStatus)); + message.taskStatus !== undefined && (obj.taskStatus = taskInfoResponse_TaskStatusToJSON(message.taskStatus)); + message.errorMessage !== undefined && (obj.errorMessage = message.errorMessage); return obj; }, - fromPartial, I>>(object: I): TaskInfo { - const message = createBaseTaskInfo(); + fromPartial, I>>(object: I): TaskInfoResponse { + const message = createBaseTaskInfoResponse(); message.taskId = (object.taskId !== undefined && object.taskId !== null) ? TaskId1.fromPartial(object.taskId) : undefined; - message.taskStatus = object.taskStatus ?? TaskInfo_TaskStatus.UNSPECIFIED; + message.taskStatus = object.taskStatus ?? TaskInfoResponse_TaskStatus.UNSPECIFIED; + message.errorMessage = object.errorMessage ?? ""; return message; }, }; @@ -313,52 +312,17 @@ export const GetTaskInfoRequest = { }, }; -function createBaseTaskInfoResponse(): TaskInfoResponse { - return { status: undefined, taskInfo: undefined }; -} - -export const TaskInfoResponse = { - fromJSON(object: any): TaskInfoResponse { - return { - status: isSet(object.status) ? Status.fromJSON(object.status) : undefined, - taskInfo: isSet(object.taskInfo) ? TaskInfo.fromJSON(object.taskInfo) : undefined, - }; - }, - - toJSON(message: TaskInfoResponse): unknown { - const obj: any = {}; - message.status !== undefined && (obj.status = message.status ? Status.toJSON(message.status) : undefined); - message.taskInfo !== undefined && (obj.taskInfo = message.taskInfo ? TaskInfo.toJSON(message.taskInfo) : undefined); - return obj; - }, - - fromPartial, I>>(object: I): TaskInfoResponse { - const message = createBaseTaskInfoResponse(); - message.status = (object.status !== undefined && object.status !== null) - ? Status.fromPartial(object.status) - : undefined; - message.taskInfo = (object.taskInfo !== undefined && object.taskInfo !== null) - ? TaskInfo.fromPartial(object.taskInfo) - : undefined; - return message; - }, -}; - function createBaseGetDataResponse(): GetDataResponse { - return { status: undefined, recordBatch: undefined }; + return { recordBatch: undefined }; } export const GetDataResponse = { fromJSON(object: any): GetDataResponse { - return { - status: isSet(object.status) ? Status.fromJSON(object.status) : undefined, - recordBatch: isSet(object.recordBatch) ? DataChunk.fromJSON(object.recordBatch) : undefined, - }; + return { recordBatch: isSet(object.recordBatch) ? DataChunk.fromJSON(object.recordBatch) : undefined }; }, toJSON(message: GetDataResponse): unknown { const obj: any = {}; - message.status !== undefined && (obj.status = message.status ? Status.toJSON(message.status) : undefined); message.recordBatch !== undefined && (obj.recordBatch = message.recordBatch ? DataChunk.toJSON(message.recordBatch) : undefined); return obj; @@ -366,9 +330,6 @@ export const GetDataResponse = { fromPartial, I>>(object: I): GetDataResponse { const message = createBaseGetDataResponse(); - message.status = (object.status !== undefined && object.status !== null) - ? Status.fromPartial(object.status) - : undefined; message.recordBatch = (object.recordBatch !== undefined && object.recordBatch !== null) ? DataChunk.fromPartial(object.recordBatch) : undefined; From a9a6ad5d8378c238b1b59c630ae0967c9c4d68df Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 6 Mar 2023 18:01:45 +0800 Subject: [PATCH 7/8] Restructure code for serial data. --- src/common/src/array/mod.rs | 6 -- src/common/src/array/primitive_array.rs | 1 + src/common/src/array/serial_array.rs | 96 ++----------------------- src/common/src/types/mod.rs | 1 + src/common/src/types/native_type.rs | 9 +++ 5 files changed, 16 insertions(+), 97 deletions(-) diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index dc76aeb7de9ed..36cf71a6c4e92 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -393,12 +393,6 @@ impl From for ArrayImpl { } } -impl From for ArrayImpl { - fn from(arr: SerialArray) -> Self { - Self::Serial(arr) - } -} - impl From for ArrayImpl { fn from(arr: StructArray) -> Self { Self::Struct(arr) diff --git a/src/common/src/array/primitive_array.rs b/src/common/src/array/primitive_array.rs index 8d521bea3fcb0..00601851dc733 100644 --- a/src/common/src/array/primitive_array.rs +++ b/src/common/src/array/primitive_array.rs @@ -21,6 +21,7 @@ use risingwave_pb::common::Buffer; use risingwave_pb::data::{Array as ProstArray, ArrayType}; use super::{Array, ArrayBuilder, ArrayResult}; +use crate::array::serial_array::Serial; use crate::array::{ArrayBuilderImpl, ArrayImpl, ArrayMeta}; use crate::buffer::{Bitmap, BitmapBuilder}; use crate::for_all_native_types; diff --git a/src/common/src/array/serial_array.rs b/src/common/src/array/serial_array.rs index 69d2f571b26ae..cf80a939448c9 100644 --- a/src/common/src/array/serial_array.rs +++ b/src/common/src/array/serial_array.rs @@ -17,11 +17,9 @@ use std::hash::Hash; use postgres_types::{ToSql as _, Type}; use serde::{Serialize, Serializer}; -use crate::array::{Array, ArrayBuilder, PrimitiveArray, PrimitiveArrayBuilder}; -use crate::buffer::Bitmap; -use crate::types::{Scalar, ScalarRef}; +use crate::array::{PrimitiveArray, PrimitiveArrayBuilder}; -#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)] pub struct Serial(i64); impl From for Serial { @@ -30,6 +28,9 @@ impl From for Serial { } } +pub type SerialArray = PrimitiveArray; +pub type SerialArrayBuilder = PrimitiveArrayBuilder; + impl Serial { #[inline] pub fn into_inner(self) -> i64 { @@ -70,90 +71,3 @@ impl crate::types::to_binary::ToBinary for Serial { Ok(Some(output.freeze())) } } - -/// Implement `Scalar` for `Serial`. -impl Scalar for Serial { - type ScalarRefType<'a> = Serial; - - fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> { - Serial(self.0) - } -} - -/// Implement `ScalarRef` for `Serial`. -impl<'a> ScalarRef<'a> for Serial { - type ScalarType = Serial; - - fn to_owned_scalar(&self) -> Serial { - *self - } - - fn hash_scalar(&self, state: &mut H) { - self.0.hash(state) - } -} - -#[derive(Debug)] -pub struct SerialArrayBuilder(PrimitiveArrayBuilder); - -#[derive(Debug, Clone)] -pub struct SerialArray(PrimitiveArray); - -impl ArrayBuilder for SerialArrayBuilder { - type ArrayType = SerialArray; - - fn with_meta(capacity: usize, _meta: super::ArrayMeta) -> Self { - Self(PrimitiveArrayBuilder::::with_meta(capacity, _meta)) - } - - fn append_n(&mut self, n: usize, value: Option<::RefItem<'_>>) { - self.0.append_n(n, value.map(|x| x.0)); - } - - fn append_array(&mut self, other: &Self::ArrayType) { - self.0.append_array(&other.0); - } - - fn pop(&mut self) -> Option<()> { - self.0.pop() - } - - fn finish(self) -> Self::ArrayType { - SerialArray(self.0.finish()) - } -} - -impl Array for SerialArray { - type Builder = SerialArrayBuilder; - type OwnedItem = Serial; - type RefItem<'a> = Serial; - - unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> { - Serial(self.0.raw_value_at_unchecked(idx)) - } - - fn len(&self) -> usize { - self.0.len() - } - - fn to_protobuf(&self) -> super::ProstArray { - self.0.to_protobuf() - } - - fn null_bitmap(&self) -> &Bitmap { - self.0.null_bitmap() - } - - fn into_null_bitmap(self) -> Bitmap { - self.0.into_null_bitmap() - } - - fn set_bitmap(&mut self, bitmap: Bitmap) { - self.0.set_bitmap(bitmap) - } - - fn create_builder(&self, capacity: usize) -> super::ArrayBuilderImpl { - let array_builder = Self::Builder::new(capacity); - super::ArrayBuilderImpl::Serial(array_builder) - } -} diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 62bf6b92b3c17..088abd7f5eb86 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -641,6 +641,7 @@ macro_rules! for_all_native_types { { i16, Int16 }, { i32, Int32 }, { i64, Int64 }, + { Serial, Serial }, { $crate::types::OrderedF32, Float32 }, { $crate::types::OrderedF64, Float64 } } diff --git a/src/common/src/types/native_type.rs b/src/common/src/types/native_type.rs index ed42ead8865e8..c655b448ed86c 100644 --- a/src/common/src/types/native_type.rs +++ b/src/common/src/types/native_type.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::io::Write; use super::{OrderedF32, OrderedF64}; +use crate::array::serial_array::Serial; use crate::array::ArrayResult; pub trait NativeType: @@ -42,6 +43,14 @@ impl NativeType for i64 { } } +impl NativeType for Serial { + fn to_protobuf(self, output: &mut T) -> ArrayResult { + output + .write(&self.into_inner().to_be_bytes()) + .map_err(Into::into) + } +} + impl NativeType for OrderedF32 { fn to_protobuf(self, output: &mut T) -> ArrayResult { output.write(&self.to_be_bytes()).map_err(Into::into) From 5cd434b6e271ca1b96f352db3fa48bc9131a194a Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 6 Mar 2023 23:32:08 +0800 Subject: [PATCH 8/8] Refactor `Serial` references and add to `PrimitiveArray` --- src/common/src/array/column_proto_readers.rs | 15 +++------------ src/common/src/array/mod.rs | 6 ++++-- src/common/src/array/serial_array.rs | 7 ++++--- src/common/src/array/value_reader.rs | 5 ++++- src/common/src/test_utils/rand_array.rs | 1 + 5 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/common/src/array/column_proto_readers.rs b/src/common/src/array/column_proto_readers.rs index b6f80c1e857e7..f1e85f2e40d22 100644 --- a/src/common/src/array/column_proto_readers.rs +++ b/src/common/src/array/column_proto_readers.rs @@ -19,12 +19,11 @@ use byteorder::{BigEndian, ReadBytesExt}; use paste::paste; use risingwave_pb::data::Array as ProstArray; -use crate::array::serial_array::Serial; -use crate::array::value_reader::{I64ValueReader, PrimitiveValueReader, VarSizedValueReader}; +use crate::array::value_reader::{PrimitiveValueReader, VarSizedValueReader}; use crate::array::{ Array, ArrayBuilder, ArrayImpl, ArrayMeta, ArrayResult, BoolArray, IntervalArrayBuilder, NaiveDateArrayBuilder, NaiveDateTimeArrayBuilder, NaiveTimeArrayBuilder, PrimitiveArrayBuilder, - PrimitiveArrayItemType, SerialArrayBuilder, + PrimitiveArrayItemType, }; use crate::buffer::Bitmap; use crate::types::interval::IntervalUnit; @@ -98,13 +97,6 @@ fn read_naive_date_time(cursor: &mut Cursor<&[u8]>) -> ArrayResult) -> ArrayResult { - I64ValueReader::read(cursor) - .map_err(|e| anyhow!("Failed to read i64 from Serial buffer: {}", e)) - .map(Serial::from) - .map_err(Into::into) -} - pub fn read_interval_unit(cursor: &mut Cursor<&[u8]>) -> ArrayResult { let mut read = || { let months = cursor.read_i32::()?; @@ -156,8 +148,7 @@ read_one_value_array! { { IntervalUnit, IntervalArrayBuilder }, { NaiveDate, NaiveDateArrayBuilder }, { NaiveTime, NaiveTimeArrayBuilder }, - { NaiveDateTime, NaiveDateTimeArrayBuilder }, - { Serial, SerialArrayBuilder } + { NaiveDateTime, NaiveDateTimeArrayBuilder } } fn read_offset(offset_cursor: &mut Cursor<&[u8]>) -> ArrayResult { diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 36cf71a6c4e92..7e955215bccc6 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -65,7 +65,7 @@ pub use utf8_array::*; pub use vis::{Vis, VisRef}; pub use self::error::ArrayError; -use crate::array::serial_array::{SerialArray, SerialArrayBuilder}; +use crate::array::serial_array::{Serial, SerialArray, SerialArrayBuilder}; use crate::buffer::Bitmap; use crate::types::*; use crate::util::iter_util::ZipEqFast; @@ -669,6 +669,9 @@ impl ArrayImpl { ProstArrayType::Int16 => read_numeric_array::(array, cardinality)?, ProstArrayType::Int32 => read_numeric_array::(array, cardinality)?, ProstArrayType::Int64 => read_numeric_array::(array, cardinality)?, + ProstArrayType::Serial => { + read_numeric_array::(array, cardinality)? + } ProstArrayType::Float32 => { read_numeric_array::(array, cardinality)? } @@ -689,7 +692,6 @@ impl ArrayImpl { ProstArrayType::Jsonb => { read_string_array::(array, cardinality)? } - ProstArrayType::Serial => read_serial_array(array, cardinality)?, ProstArrayType::Struct => StructArray::from_protobuf(array)?, ProstArrayType::List => ListArray::from_protobuf(array)?, ProstArrayType::Unspecified => unreachable!(), diff --git a/src/common/src/array/serial_array.rs b/src/common/src/array/serial_array.rs index cf80a939448c9..0854112568440 100644 --- a/src/common/src/array/serial_array.rs +++ b/src/common/src/array/serial_array.rs @@ -19,18 +19,19 @@ use serde::{Serialize, Serializer}; use crate::array::{PrimitiveArray, PrimitiveArrayBuilder}; +// Serial is an alias for i64 #[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)] pub struct Serial(i64); +pub type SerialArray = PrimitiveArray; +pub type SerialArrayBuilder = PrimitiveArrayBuilder; + impl From for Serial { fn from(value: i64) -> Self { Self(value) } } -pub type SerialArray = PrimitiveArray; -pub type SerialArrayBuilder = PrimitiveArrayBuilder; - impl Serial { #[inline] pub fn into_inner(self) -> i64 { diff --git a/src/common/src/array/value_reader.rs b/src/common/src/array/value_reader.rs index 371f08bcd8002..294d38a382d73 100644 --- a/src/common/src/array/value_reader.rs +++ b/src/common/src/array/value_reader.rs @@ -19,7 +19,8 @@ use byteorder::{BigEndian, ReadBytesExt}; use super::ArrayResult; use crate::array::{ - ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Utf8ArrayBuilder, + ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Serial, + Utf8ArrayBuilder, }; use crate::types::{Decimal, OrderedF32, OrderedF64}; @@ -31,6 +32,7 @@ pub trait PrimitiveValueReader { pub struct I16ValueReader; pub struct I32ValueReader; pub struct I64ValueReader; +pub struct SerialValueReader; pub struct F32ValueReader; pub struct F64ValueReader; @@ -50,6 +52,7 @@ macro_rules! impl_numeric_value_reader { impl_numeric_value_reader!(i16, I16ValueReader, read_i16); impl_numeric_value_reader!(i32, I32ValueReader, read_i32); impl_numeric_value_reader!(i64, I64ValueReader, read_i64); +impl_numeric_value_reader!(Serial, SerialValueReader, read_i64); impl_numeric_value_reader!(OrderedF32, F32ValueReader, read_f32); impl_numeric_value_reader!(OrderedF64, F64ValueReader, read_f64); diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs index 83b4b884d8d2d..6aa42d32acdbc 100644 --- a/src/common/src/test_utils/rand_array.rs +++ b/src/common/src/test_utils/rand_array.rs @@ -120,6 +120,7 @@ impl RandValue for bool { impl RandValue for Serial { fn rand_value(rand: &mut R) -> Self { + // TODO(peng), serial should be in format of RowId i64::rand_value(rand).into() } }