diff --git a/dashboard/proto/gen/data.ts b/dashboard/proto/gen/data.ts index f34bbab892582..1b865bc65a17a 100644 --- a/dashboard/proto/gen/data.ts +++ b/dashboard/proto/gen/data.ts @@ -21,6 +21,7 @@ export const RwArrayType = { LIST: "LIST", BYTEA: "BYTEA", JSONB: "JSONB", + SERIAL: "SERIAL", UNRECOGNIZED: "UNRECOGNIZED", } as const; @@ -79,6 +80,9 @@ export function rwArrayTypeFromJSON(object: any): RwArrayType { case 16: case "JSONB": return RwArrayType.JSONB; + case 17: + case "SERIAL": + return RwArrayType.SERIAL; case -1: case "UNRECOGNIZED": default: @@ -122,6 +126,8 @@ export function rwArrayTypeToJSON(object: RwArrayType): string { return "BYTEA"; case RwArrayType.JSONB: return "JSONB"; + case RwArrayType.SERIAL: + return "SERIAL"; case RwArrayType.UNRECOGNIZED: default: return "UNRECOGNIZED"; diff --git a/dashboard/proto/gen/task_service.ts b/dashboard/proto/gen/task_service.ts index 8446c702d48b5..2cfcb1d49bf04 100644 --- a/dashboard/proto/gen/task_service.ts +++ b/dashboard/proto/gen/task_service.ts @@ -13,12 +13,14 @@ export interface TaskId { taskId: number; } -export interface TaskInfo { +export interface TaskInfoResponse { taskId: TaskId1 | undefined; - taskStatus: TaskInfo_TaskStatus; + taskStatus: TaskInfoResponse_TaskStatus; + /** Optional error message for failed task. */ + errorMessage: string; } -export const TaskInfo_TaskStatus = { +export const TaskInfoResponse_TaskStatus = { /** UNSPECIFIED - Note: Requirement of proto3: first enum must be 0. */ UNSPECIFIED: "UNSPECIFIED", PENDING: "PENDING", @@ -29,50 +31,50 @@ export const TaskInfo_TaskStatus = { UNRECOGNIZED: "UNRECOGNIZED", } as const; -export type TaskInfo_TaskStatus = typeof TaskInfo_TaskStatus[keyof typeof TaskInfo_TaskStatus]; +export type TaskInfoResponse_TaskStatus = typeof TaskInfoResponse_TaskStatus[keyof typeof TaskInfoResponse_TaskStatus]; -export function taskInfo_TaskStatusFromJSON(object: any): TaskInfo_TaskStatus { +export function taskInfoResponse_TaskStatusFromJSON(object: any): TaskInfoResponse_TaskStatus { switch (object) { case 0: case "UNSPECIFIED": - return TaskInfo_TaskStatus.UNSPECIFIED; + return TaskInfoResponse_TaskStatus.UNSPECIFIED; case 2: case "PENDING": - return TaskInfo_TaskStatus.PENDING; + return TaskInfoResponse_TaskStatus.PENDING; case 3: case "RUNNING": - return TaskInfo_TaskStatus.RUNNING; + return TaskInfoResponse_TaskStatus.RUNNING; case 6: case "FINISHED": - return TaskInfo_TaskStatus.FINISHED; + return TaskInfoResponse_TaskStatus.FINISHED; case 7: case "FAILED": - return TaskInfo_TaskStatus.FAILED; + return TaskInfoResponse_TaskStatus.FAILED; case 8: case "ABORTED": - return TaskInfo_TaskStatus.ABORTED; + return TaskInfoResponse_TaskStatus.ABORTED; case -1: case "UNRECOGNIZED": default: - return TaskInfo_TaskStatus.UNRECOGNIZED; + return TaskInfoResponse_TaskStatus.UNRECOGNIZED; } } -export function taskInfo_TaskStatusToJSON(object: TaskInfo_TaskStatus): string { +export function taskInfoResponse_TaskStatusToJSON(object: TaskInfoResponse_TaskStatus): string { switch (object) { - case TaskInfo_TaskStatus.UNSPECIFIED: + case TaskInfoResponse_TaskStatus.UNSPECIFIED: return "UNSPECIFIED"; - case TaskInfo_TaskStatus.PENDING: + case TaskInfoResponse_TaskStatus.PENDING: return "PENDING"; - case TaskInfo_TaskStatus.RUNNING: + case TaskInfoResponse_TaskStatus.RUNNING: return "RUNNING"; - case TaskInfo_TaskStatus.FINISHED: + case TaskInfoResponse_TaskStatus.FINISHED: return "FINISHED"; - case TaskInfo_TaskStatus.FAILED: + case TaskInfoResponse_TaskStatus.FAILED: return "FAILED"; - case TaskInfo_TaskStatus.ABORTED: + case TaskInfoResponse_TaskStatus.ABORTED: return "ABORTED"; - case TaskInfo_TaskStatus.UNRECOGNIZED: + case TaskInfoResponse_TaskStatus.UNRECOGNIZED: default: return "UNRECOGNIZED"; } @@ -96,13 +98,7 @@ export interface GetTaskInfoRequest { taskId: TaskId1 | undefined; } -export interface TaskInfoResponse { - status: Status | undefined; - taskInfo: TaskInfo | undefined; -} - export interface GetDataResponse { - status: Status | undefined; recordBatch: DataChunk | undefined; } @@ -174,33 +170,36 @@ export const TaskId = { }, }; -function createBaseTaskInfo(): TaskInfo { - return { taskId: undefined, taskStatus: TaskInfo_TaskStatus.UNSPECIFIED }; +function createBaseTaskInfoResponse(): TaskInfoResponse { + return { taskId: undefined, taskStatus: TaskInfoResponse_TaskStatus.UNSPECIFIED, errorMessage: "" }; } -export const TaskInfo = { - fromJSON(object: any): TaskInfo { +export const TaskInfoResponse = { + fromJSON(object: any): TaskInfoResponse { return { taskId: isSet(object.taskId) ? TaskId1.fromJSON(object.taskId) : undefined, taskStatus: isSet(object.taskStatus) - ? taskInfo_TaskStatusFromJSON(object.taskStatus) - : TaskInfo_TaskStatus.UNSPECIFIED, + ? taskInfoResponse_TaskStatusFromJSON(object.taskStatus) + : TaskInfoResponse_TaskStatus.UNSPECIFIED, + errorMessage: isSet(object.errorMessage) ? String(object.errorMessage) : "", }; }, - toJSON(message: TaskInfo): unknown { + toJSON(message: TaskInfoResponse): unknown { const obj: any = {}; message.taskId !== undefined && (obj.taskId = message.taskId ? TaskId1.toJSON(message.taskId) : undefined); - message.taskStatus !== undefined && (obj.taskStatus = taskInfo_TaskStatusToJSON(message.taskStatus)); + message.taskStatus !== undefined && (obj.taskStatus = taskInfoResponse_TaskStatusToJSON(message.taskStatus)); + message.errorMessage !== undefined && (obj.errorMessage = message.errorMessage); return obj; }, - fromPartial, I>>(object: I): TaskInfo { - const message = createBaseTaskInfo(); + fromPartial, I>>(object: I): TaskInfoResponse { + const message = createBaseTaskInfoResponse(); message.taskId = (object.taskId !== undefined && object.taskId !== null) ? TaskId1.fromPartial(object.taskId) : undefined; - message.taskStatus = object.taskStatus ?? TaskInfo_TaskStatus.UNSPECIFIED; + message.taskStatus = object.taskStatus ?? TaskInfoResponse_TaskStatus.UNSPECIFIED; + message.errorMessage = object.errorMessage ?? ""; return message; }, }; @@ -313,52 +312,17 @@ export const GetTaskInfoRequest = { }, }; -function createBaseTaskInfoResponse(): TaskInfoResponse { - return { status: undefined, taskInfo: undefined }; -} - -export const TaskInfoResponse = { - fromJSON(object: any): TaskInfoResponse { - return { - status: isSet(object.status) ? Status.fromJSON(object.status) : undefined, - taskInfo: isSet(object.taskInfo) ? TaskInfo.fromJSON(object.taskInfo) : undefined, - }; - }, - - toJSON(message: TaskInfoResponse): unknown { - const obj: any = {}; - message.status !== undefined && (obj.status = message.status ? Status.toJSON(message.status) : undefined); - message.taskInfo !== undefined && (obj.taskInfo = message.taskInfo ? TaskInfo.toJSON(message.taskInfo) : undefined); - return obj; - }, - - fromPartial, I>>(object: I): TaskInfoResponse { - const message = createBaseTaskInfoResponse(); - message.status = (object.status !== undefined && object.status !== null) - ? Status.fromPartial(object.status) - : undefined; - message.taskInfo = (object.taskInfo !== undefined && object.taskInfo !== null) - ? TaskInfo.fromPartial(object.taskInfo) - : undefined; - return message; - }, -}; - function createBaseGetDataResponse(): GetDataResponse { - return { status: undefined, recordBatch: undefined }; + return { recordBatch: undefined }; } export const GetDataResponse = { fromJSON(object: any): GetDataResponse { - return { - status: isSet(object.status) ? Status.fromJSON(object.status) : undefined, - recordBatch: isSet(object.recordBatch) ? DataChunk.fromJSON(object.recordBatch) : undefined, - }; + return { recordBatch: isSet(object.recordBatch) ? DataChunk.fromJSON(object.recordBatch) : undefined }; }, toJSON(message: GetDataResponse): unknown { const obj: any = {}; - message.status !== undefined && (obj.status = message.status ? Status.toJSON(message.status) : undefined); message.recordBatch !== undefined && (obj.recordBatch = message.recordBatch ? DataChunk.toJSON(message.recordBatch) : undefined); return obj; @@ -366,9 +330,6 @@ export const GetDataResponse = { fromPartial, I>>(object: I): GetDataResponse { const message = createBaseGetDataResponse(); - message.status = (object.status !== undefined && object.status !== null) - ? Status.fromPartial(object.status) - : undefined; message.recordBatch = (object.recordBatch !== undefined && object.recordBatch !== null) ? DataChunk.fromPartial(object.recordBatch) : undefined; diff --git a/proto/data.proto b/proto/data.proto index 0db71cf446773..fba6ec6f4c1c0 100644 --- a/proto/data.proto +++ b/proto/data.proto @@ -97,6 +97,7 @@ enum ArrayType { LIST = 14; BYTEA = 15; JSONB = 16; + SERIAL = 17; } message Array { diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index d1bbff8f329e4..7fb068252b4f6 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -444,6 +444,7 @@ impl From<&ListArray> for arrow_array::ListArray { |b, v| b.append_option(v.map(|d| d.into_arrow())), ), ArrayImpl::Jsonb(_) => todo!("list of jsonb"), + ArrayImpl::Serial(_) => todo!("list of serial"), ArrayImpl::Struct(_) => todo!("list of struct"), ArrayImpl::List(_) => todo!("list of list"), ArrayImpl::Bytea(a) => build( diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index c125b77ff8a31..7e955215bccc6 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -65,6 +65,7 @@ pub use utf8_array::*; pub use vis::{Vis, VisRef}; pub use self::error::ArrayError; +use crate::array::serial_array::{Serial, SerialArray, SerialArrayBuilder}; use crate::buffer::Bitmap; use crate::types::*; use crate::util::iter_util::ZipEqFast; @@ -347,6 +348,7 @@ macro_rules! for_all_variants { { NaiveDateTime, naivedatetime, NaiveDateTimeArray, NaiveDateTimeArrayBuilder }, { NaiveTime, naivetime, NaiveTimeArray, NaiveTimeArrayBuilder }, { Jsonb, jsonb, JsonbArray, JsonbArrayBuilder }, + { Serial, serial, SerialArray, SerialArrayBuilder }, { Struct, struct, StructArray, StructArrayBuilder }, { List, list, ListArray, ListArrayBuilder }, { Bytea, bytea, BytesArray, BytesArrayBuilder} @@ -667,6 +669,9 @@ impl ArrayImpl { ProstArrayType::Int16 => read_numeric_array::(array, cardinality)?, ProstArrayType::Int32 => read_numeric_array::(array, cardinality)?, ProstArrayType::Int64 => read_numeric_array::(array, cardinality)?, + ProstArrayType::Serial => { + read_numeric_array::(array, cardinality)? + } ProstArrayType::Float32 => { read_numeric_array::(array, cardinality)? } diff --git a/src/common/src/array/primitive_array.rs b/src/common/src/array/primitive_array.rs index 8d521bea3fcb0..00601851dc733 100644 --- a/src/common/src/array/primitive_array.rs +++ b/src/common/src/array/primitive_array.rs @@ -21,6 +21,7 @@ use risingwave_pb::common::Buffer; use risingwave_pb::data::{Array as ProstArray, ArrayType}; use super::{Array, ArrayBuilder, ArrayResult}; +use crate::array::serial_array::Serial; use crate::array::{ArrayBuilderImpl, ArrayImpl, ArrayMeta}; use crate::buffer::{Bitmap, BitmapBuilder}; use crate::for_all_native_types; diff --git a/src/common/src/array/serial_array.rs b/src/common/src/array/serial_array.rs index ae1b0f10c7773..0854112568440 100644 --- a/src/common/src/array/serial_array.rs +++ b/src/common/src/array/serial_array.rs @@ -17,11 +17,21 @@ use std::hash::Hash; use postgres_types::{ToSql as _, Type}; use serde::{Serialize, Serializer}; -use crate::types::{Scalar, ScalarRef}; +use crate::array::{PrimitiveArray, PrimitiveArrayBuilder}; -#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] +// Serial is an alias for i64 +#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Default, Hash)] pub struct Serial(i64); +pub type SerialArray = PrimitiveArray; +pub type SerialArrayBuilder = PrimitiveArrayBuilder; + +impl From for Serial { + fn from(value: i64) -> Self { + Self(value) + } +} + impl Serial { #[inline] pub fn into_inner(self) -> i64 { @@ -62,25 +72,3 @@ impl crate::types::to_binary::ToBinary for Serial { Ok(Some(output.freeze())) } } - -/// Implement `Scalar` for `Serial`. -impl Scalar for Serial { - type ScalarRefType<'a> = Serial; - - fn as_scalar_ref(&self) -> Self::ScalarRefType<'_> { - Serial(self.0) - } -} - -/// Implement `ScalarRef` for `Serial`. -impl<'a> ScalarRef<'a> for Serial { - type ScalarType = Serial; - - fn to_owned_scalar(&self) -> Serial { - *self - } - - fn hash_scalar(&self, state: &mut H) { - self.0.hash(state) - } -} diff --git a/src/common/src/array/value_reader.rs b/src/common/src/array/value_reader.rs index 371f08bcd8002..294d38a382d73 100644 --- a/src/common/src/array/value_reader.rs +++ b/src/common/src/array/value_reader.rs @@ -19,7 +19,8 @@ use byteorder::{BigEndian, ReadBytesExt}; use super::ArrayResult; use crate::array::{ - ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Utf8ArrayBuilder, + ArrayBuilder, BytesArrayBuilder, JsonbArrayBuilder, PrimitiveArrayItemType, Serial, + Utf8ArrayBuilder, }; use crate::types::{Decimal, OrderedF32, OrderedF64}; @@ -31,6 +32,7 @@ pub trait PrimitiveValueReader { pub struct I16ValueReader; pub struct I32ValueReader; pub struct I64ValueReader; +pub struct SerialValueReader; pub struct F32ValueReader; pub struct F64ValueReader; @@ -50,6 +52,7 @@ macro_rules! impl_numeric_value_reader { impl_numeric_value_reader!(i16, I16ValueReader, read_i16); impl_numeric_value_reader!(i32, I32ValueReader, read_i32); impl_numeric_value_reader!(i64, I64ValueReader, read_i64); +impl_numeric_value_reader!(Serial, SerialValueReader, read_i64); impl_numeric_value_reader!(OrderedF32, F32ValueReader, read_f32); impl_numeric_value_reader!(OrderedF64, F64ValueReader, read_f64); diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index 48901111f3288..0856bda158daa 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -30,6 +30,7 @@ use std::io::{Cursor, Read}; use chrono::{Datelike, Timelike}; use fixedbitset::FixedBitSet; +use crate::array::serial_array::Serial; use crate::array::{ Array, ArrayBuilder, ArrayBuilderImpl, ArrayError, ArrayImpl, ArrayResult, DataChunk, JsonbRef, ListRef, StructRef, @@ -477,6 +478,18 @@ impl<'a> HashKeySerDe<'a> for JsonbRef<'a> { } } +impl<'a> HashKeySerDe<'a> for Serial { + type S = >::S; + + fn serialize(self) -> Self::S { + self.into_inner().serialize() + } + + fn deserialize(source: &mut R) -> Self { + i64::deserialize(source).into() + } +} + impl<'a> HashKeySerDe<'a> for StructRef<'a> { type S = Vec; diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs index 01365d85c325d..6aa42d32acdbc 100644 --- a/src/common/src/test_utils/rand_array.rs +++ b/src/common/src/test_utils/rand_array.rs @@ -25,6 +25,7 @@ use rand::prelude::Distribution; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; +use crate::array::serial_array::Serial; use crate::array::{Array, ArrayBuilder, ArrayRef, JsonbVal, ListValue, StructValue}; use crate::types::{ Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper, NativeType, @@ -117,6 +118,13 @@ impl RandValue for bool { } } +impl RandValue for Serial { + fn rand_value(rand: &mut R) -> Self { + // TODO(peng), serial should be in format of RowId + i64::rand_value(rand).into() + } +} + impl RandValue for JsonbVal { fn rand_value(_rand: &mut R) -> Self { JsonbVal::dummy() @@ -177,6 +185,7 @@ where mod tests { use super::*; use crate::array::interval_array::IntervalArray; + use crate::array::serial_array::SerialArray; use crate::array::*; use crate::for_all_variants; diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 62bf6b92b3c17..088abd7f5eb86 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -641,6 +641,7 @@ macro_rules! for_all_native_types { { i16, Int16 }, { i32, Int32 }, { i64, Int64 }, + { Serial, Serial }, { $crate::types::OrderedF32, Float32 }, { $crate::types::OrderedF64, Float64 } } diff --git a/src/common/src/types/native_type.rs b/src/common/src/types/native_type.rs index ed42ead8865e8..c655b448ed86c 100644 --- a/src/common/src/types/native_type.rs +++ b/src/common/src/types/native_type.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::io::Write; use super::{OrderedF32, OrderedF64}; +use crate::array::serial_array::Serial; use crate::array::ArrayResult; pub trait NativeType: @@ -42,6 +43,14 @@ impl NativeType for i64 { } } +impl NativeType for Serial { + fn to_protobuf(self, output: &mut T) -> ArrayResult { + output + .write(&self.into_inner().to_be_bytes()) + .map_err(Into::into) + } +} + impl NativeType for OrderedF32 { fn to_protobuf(self, output: &mut T) -> ArrayResult { output.write(&self.to_be_bytes()).map_err(Into::into)