diff --git a/ci/connector-node-version b/ci/connector-node-version deleted file mode 100644 index 1afd364109050..0000000000000 --- a/ci/connector-node-version +++ /dev/null @@ -1 +0,0 @@ -v0.1.17 \ No newline at end of file diff --git a/ci/scripts/docker.sh b/ci/scripts/docker.sh index fff87329d684c..d84cbc39016dc 100755 --- a/ci/scripts/docker.sh +++ b/ci/scripts/docker.sh @@ -6,11 +6,6 @@ set -euo pipefail ghcraddr="ghcr.io/risingwavelabs/risingwave" dockerhubaddr="risingwavelabs/risingwave" arch="$(uname -m)" -connector_node_version=$(cat ci/connector-node-version) - -# Git clone risingwave-connector-node repo -git clone https://"$GITHUB_TOKEN"@github.com/risingwavelabs/risingwave-connector-node.git -cd risingwave-connector-node && git checkout ${connector_node_version} && cd .. # Build RisingWave docker image ${BUILDKITE_COMMIT}-${arch} echo "--- docker build and tag" diff --git a/ci/scripts/release.sh b/ci/scripts/release.sh index bd28820161740..041c0c24611b9 100755 --- a/ci/scripts/release.sh +++ b/ci/scripts/release.sh @@ -3,7 +3,7 @@ # Exits as soon as any line fails. set -euo pipefail -connector_node_version=$(cat ci/connector-node-version) +REPO_ROOT=${PWD} echo "--- Check env" if [ "${BUILDKITE_SOURCE}" != "schedule" ] && [ "${BUILDKITE_SOURCE}" != "webhook" ] && [[ -z "${BINARY_NAME+x}" ]]; then @@ -63,10 +63,9 @@ if [[ -n "${BUILDKITE_TAG+x}" ]]; then gh release upload "${BUILDKITE_TAG}" risectl-"${BUILDKITE_TAG}"-x86_64-unknown-linux.tar.gz echo "--- Release build and upload risingwave connector node jar asset" -# git clone https://"$GITHUB_TOKEN"@github.com/risingwavelabs/risingwave-connector-node.git -# cd risingwave-connector-node && git checkout ${connector_node_version} && mvn -B package -Dmaven.test.skip=true -# cd assembly/target && mv risingwave-connector-1.0.0.tar.gz risingwave-connector-"${BUILDKITE_TAG}".tar.gz -# gh release upload "${BUILDKITE_TAG}" risingwave-connector-"${BUILDKITE_TAG}".tar.gz + cd ${REPO_ROOT}/java && mvn -B package -Dmaven.test.skip=true + cd connector-node/assembly/target && mv risingwave-connector-1.0.0.tar.gz risingwave-connector-"${BUILDKITE_TAG}".tar.gz + gh release upload "${BUILDKITE_TAG}" risingwave-connector-"${BUILDKITE_TAG}".tar.gz fi diff --git a/ci/scripts/s3-source-test.sh b/ci/scripts/s3-source-test.sh new file mode 100755 index 0000000000000..58c2cbd93863f --- /dev/null +++ b/ci/scripts/s3-source-test.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +set -euo pipefail + +source ci/scripts/common.env.sh + +while getopts 'p:' opt; do + case ${opt} in + p ) + profile=$OPTARG + ;; + \? ) + echo "Invalid Option: -$OPTARG" 1>&2 + exit 1 + ;; + : ) + echo "Invalid option: $OPTARG requires an argument" 1>&2 + ;; + esac +done +shift $((OPTIND -1)) + +echo "--- Download artifacts" +mkdir -p target/debug +buildkite-agent artifact download risingwave-"$profile" target/debug/ +buildkite-agent artifact download risedev-dev-"$profile" target/debug/ + +mv target/debug/risingwave-"$profile" target/debug/risingwave +mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev + +echo "--- Adjust permission" +chmod +x ./target/debug/risingwave +chmod +x ./target/debug/risedev-dev + +echo "--- Generate RiseDev CI config" +cp ci/risedev-components.ci.env risedev-components.user.env + +echo "--- Prepare RiseDev dev cluster" +cargo make pre-start-dev +cargo make link-all-in-one-binaries + +echo "--- starting risingwave cluster with connector node" +cargo make ci-start ci-1cn-1fe + +echo "--- Run test" +python3 -m pip install minio psycopg2-binary +python3 e2e_test/s3/run.py + +echo "--- Kill cluster" +cargo make ci-kill diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index efce639a358c9..766320c4fd34f 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -183,3 +183,35 @@ steps: files: ./**/*.sh timeout_in_minutes: 5 retry: *auto-retry + + - label: "S3 source check on AWS" + command: "ci/scripts/s3-source-test.sh -p ci-release" + depends_on: build + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + S3_SOURCE_TEST_CONF: ci_s3_source_test_aws + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - S3_SOURCE_TEST_CONF + timeout_in_minutes: 20 + retry: *auto-retry + + - label: "S3 source check on lyvecloud.seagate.com" + command: "ci/scripts/s3-source-test.sh -p ci-release" + depends_on: build + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + S3_SOURCE_TEST_CONF: ci_s3_source_test_lyvecloud + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - S3_SOURCE_TEST_CONF + timeout_in_minutes: 20 + retry: *auto-retry diff --git a/dashboard/components/Relations.tsx b/dashboard/components/Relations.tsx index 0f3dc54b3c9c0..118a335549e13 100644 --- a/dashboard/components/Relations.tsx +++ b/dashboard/components/Relations.tsx @@ -92,7 +92,7 @@ export const primaryKeyColumn: Column = { width: 1, content: (r) => r.pk - .map((order) => order.index) + .map((order) => order.columnIndex) .map((i) => r.columns[i]) .map((col) => extractColumnInfo(col)) .join(", "), diff --git a/dashboard/proto/gen/batch_plan.ts b/dashboard/proto/gen/batch_plan.ts index ab9083be3eead..73420d1086fd6 100644 --- a/dashboard/proto/gen/batch_plan.ts +++ b/dashboard/proto/gen/batch_plan.ts @@ -1,19 +1,24 @@ /* eslint-disable */ import { StreamSourceInfo } from "./catalog"; -import { BatchQueryEpoch, Buffer, HostAddress, WorkerNode } from "./common"; +import { + BatchQueryEpoch, + Buffer, + ColumnOrder, + Direction, + directionFromJSON, + directionToJSON, + HostAddress, + WorkerNode, +} from "./common"; import { IntervalUnit } from "./data"; import { AggCall, ExprNode, ProjectSetSelectItem, TableFunction } from "./expr"; import { ColumnCatalog, ColumnDesc, - ColumnOrder, Field, JoinType, joinTypeFromJSON, joinTypeToJSON, - OrderType, - orderTypeFromJSON, - orderTypeToJSON, StorageTableDesc, } from "./plan_common"; @@ -207,7 +212,7 @@ export interface SortMergeJoinNode { joinType: JoinType; leftKey: number[]; rightKey: number[]; - direction: OrderType; + direction: Direction; outputIndices: number[]; } @@ -1325,7 +1330,7 @@ function createBaseSortMergeJoinNode(): SortMergeJoinNode { joinType: JoinType.UNSPECIFIED, leftKey: [], rightKey: [], - direction: OrderType.ORDER_UNSPECIFIED, + direction: Direction.DIRECTION_UNSPECIFIED, outputIndices: [], }; } @@ -1336,7 +1341,7 @@ export const SortMergeJoinNode = { joinType: isSet(object.joinType) ? joinTypeFromJSON(object.joinType) : JoinType.UNSPECIFIED, leftKey: Array.isArray(object?.leftKey) ? object.leftKey.map((e: any) => Number(e)) : [], rightKey: Array.isArray(object?.rightKey) ? object.rightKey.map((e: any) => Number(e)) : [], - direction: isSet(object.direction) ? orderTypeFromJSON(object.direction) : OrderType.ORDER_UNSPECIFIED, + direction: isSet(object.direction) ? directionFromJSON(object.direction) : Direction.DIRECTION_UNSPECIFIED, outputIndices: Array.isArray(object?.outputIndices) ? object.outputIndices.map((e: any) => Number(e)) : [], }; }, @@ -1354,7 +1359,7 @@ export const SortMergeJoinNode = { } else { obj.rightKey = []; } - message.direction !== undefined && (obj.direction = orderTypeToJSON(message.direction)); + message.direction !== undefined && (obj.direction = directionToJSON(message.direction)); if (message.outputIndices) { obj.outputIndices = message.outputIndices.map((e) => Math.round(e)); } else { @@ -1368,7 +1373,7 @@ export const SortMergeJoinNode = { message.joinType = object.joinType ?? JoinType.UNSPECIFIED; message.leftKey = object.leftKey?.map((e) => e) || []; message.rightKey = object.rightKey?.map((e) => e) || []; - message.direction = object.direction ?? OrderType.ORDER_UNSPECIFIED; + message.direction = object.direction ?? Direction.DIRECTION_UNSPECIFIED; message.outputIndices = object.outputIndices?.map((e) => e) || []; return message; }, diff --git a/dashboard/proto/gen/catalog.ts b/dashboard/proto/gen/catalog.ts index c6f7c847be1ba..0590f0a959ce6 100644 --- a/dashboard/proto/gen/catalog.ts +++ b/dashboard/proto/gen/catalog.ts @@ -1,14 +1,8 @@ /* eslint-disable */ +import { ColumnOrder } from "./common"; import { DataType } from "./data"; import { ExprNode } from "./expr"; -import { - ColumnCatalog, - ColumnOrder, - Field, - RowFormatType, - rowFormatTypeFromJSON, - rowFormatTypeToJSON, -} from "./plan_common"; +import { ColumnCatalog, Field, RowFormatType, rowFormatTypeFromJSON, rowFormatTypeToJSON } from "./plan_common"; export const protobufPackage = "catalog"; @@ -258,6 +252,7 @@ export interface Table { handlePkConflictBehavior: HandleConflictBehavior; readPrefixLenHint: number; watermarkIndices: number[]; + distKeyInPk: number[]; /** * Per-table catalog version, used by schema change. `None` for internal tables and tests. * Not to be confused with the global catalog version for notification service. @@ -902,6 +897,7 @@ function createBaseTable(): Table { handlePkConflictBehavior: HandleConflictBehavior.NO_CHECK_UNSPECIFIED, readPrefixLenHint: 0, watermarkIndices: [], + distKeyInPk: [], version: undefined, }; } @@ -948,6 +944,7 @@ export const Table = { watermarkIndices: Array.isArray(object?.watermarkIndices) ? object.watermarkIndices.map((e: any) => Number(e)) : [], + distKeyInPk: Array.isArray(object?.distKeyInPk) ? object.distKeyInPk.map((e: any) => Number(e)) : [], version: isSet(object.version) ? Table_TableVersion.fromJSON(object.version) : undefined, }; }, @@ -1011,6 +1008,11 @@ export const Table = { } else { obj.watermarkIndices = []; } + if (message.distKeyInPk) { + obj.distKeyInPk = message.distKeyInPk.map((e) => Math.round(e)); + } else { + obj.distKeyInPk = []; + } message.version !== undefined && (obj.version = message.version ? Table_TableVersion.toJSON(message.version) : undefined); return obj; @@ -1057,6 +1059,7 @@ export const Table = { message.handlePkConflictBehavior = object.handlePkConflictBehavior ?? HandleConflictBehavior.NO_CHECK_UNSPECIFIED; message.readPrefixLenHint = object.readPrefixLenHint ?? 0; message.watermarkIndices = object.watermarkIndices?.map((e) => e) || []; + message.distKeyInPk = object.distKeyInPk?.map((e) => e) || []; message.version = (object.version !== undefined && object.version !== null) ? Table_TableVersion.fromPartial(object.version) : undefined; diff --git a/dashboard/proto/gen/common.ts b/dashboard/proto/gen/common.ts index b87219cf9e71e..f3cefe80d6710 100644 --- a/dashboard/proto/gen/common.ts +++ b/dashboard/proto/gen/common.ts @@ -61,6 +61,47 @@ export function workerTypeToJSON(object: WorkerType): string { } } +export const Direction = { + DIRECTION_UNSPECIFIED: "DIRECTION_UNSPECIFIED", + DIRECTION_ASCENDING: "DIRECTION_ASCENDING", + DIRECTION_DESCENDING: "DIRECTION_DESCENDING", + UNRECOGNIZED: "UNRECOGNIZED", +} as const; + +export type Direction = typeof Direction[keyof typeof Direction]; + +export function directionFromJSON(object: any): Direction { + switch (object) { + case 0: + case "DIRECTION_UNSPECIFIED": + return Direction.DIRECTION_UNSPECIFIED; + case 1: + case "DIRECTION_ASCENDING": + return Direction.DIRECTION_ASCENDING; + case 2: + case "DIRECTION_DESCENDING": + return Direction.DIRECTION_DESCENDING; + case -1: + case "UNRECOGNIZED": + default: + return Direction.UNRECOGNIZED; + } +} + +export function directionToJSON(object: Direction): string { + switch (object) { + case Direction.DIRECTION_UNSPECIFIED: + return "DIRECTION_UNSPECIFIED"; + case Direction.DIRECTION_ASCENDING: + return "DIRECTION_ASCENDING"; + case Direction.DIRECTION_DESCENDING: + return "DIRECTION_DESCENDING"; + case Direction.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + export interface Status { code: Status_Code; message: string; @@ -225,6 +266,20 @@ export interface BatchQueryEpoch { }; } +export interface OrderType { + /** + * TODO(rc): enable `NULLS FIRST | LAST` + * NullsAre nulls_are = 2; + */ + direction: Direction; +} + +/** Column index with an order type (ASC or DESC). Used to represent a sort key (`repeated ColumnOrder`). */ +export interface ColumnOrder { + columnIndex: number; + orderType: OrderType | undefined; +} + function createBaseStatus(): Status { return { code: Status_Code.UNSPECIFIED, message: "" }; } @@ -489,6 +544,60 @@ export const BatchQueryEpoch = { }, }; +function createBaseOrderType(): OrderType { + return { direction: Direction.DIRECTION_UNSPECIFIED }; +} + +export const OrderType = { + fromJSON(object: any): OrderType { + return { + direction: isSet(object.direction) ? directionFromJSON(object.direction) : Direction.DIRECTION_UNSPECIFIED, + }; + }, + + toJSON(message: OrderType): unknown { + const obj: any = {}; + message.direction !== undefined && (obj.direction = directionToJSON(message.direction)); + return obj; + }, + + fromPartial, I>>(object: I): OrderType { + const message = createBaseOrderType(); + message.direction = object.direction ?? Direction.DIRECTION_UNSPECIFIED; + return message; + }, +}; + +function createBaseColumnOrder(): ColumnOrder { + return { columnIndex: 0, orderType: undefined }; +} + +export const ColumnOrder = { + fromJSON(object: any): ColumnOrder { + return { + columnIndex: isSet(object.columnIndex) ? Number(object.columnIndex) : 0, + orderType: isSet(object.orderType) ? OrderType.fromJSON(object.orderType) : undefined, + }; + }, + + toJSON(message: ColumnOrder): unknown { + const obj: any = {}; + message.columnIndex !== undefined && (obj.columnIndex = Math.round(message.columnIndex)); + message.orderType !== undefined && + (obj.orderType = message.orderType ? OrderType.toJSON(message.orderType) : undefined); + return obj; + }, + + fromPartial, I>>(object: I): ColumnOrder { + const message = createBaseColumnOrder(); + message.columnIndex = object.columnIndex ?? 0; + message.orderType = (object.orderType !== undefined && object.orderType !== null) + ? OrderType.fromPartial(object.orderType) + : undefined; + return message; + }, +}; + declare var self: any | undefined; declare var window: any | undefined; declare var global: any | undefined; diff --git a/dashboard/proto/gen/expr.ts b/dashboard/proto/gen/expr.ts index 553597e94853e..08995062c54d2 100644 --- a/dashboard/proto/gen/expr.ts +++ b/dashboard/proto/gen/expr.ts @@ -1,6 +1,6 @@ /* eslint-disable */ +import { ColumnOrder } from "./common"; import { DataType, Datum } from "./data"; -import { OrderType, orderTypeFromJSON, orderTypeToJSON } from "./plan_common"; export const protobufPackage = "expr"; @@ -741,7 +741,7 @@ export interface AggCall { args: InputRef[]; returnType: DataType | undefined; distinct: boolean; - orderByFields: AggCall_OrderByField[]; + orderBy: ColumnOrder[]; filter: ExprNode | undefined; } @@ -858,12 +858,6 @@ export function aggCall_TypeToJSON(object: AggCall_Type): string { } } -export interface AggCall_OrderByField { - input: number; - direction: OrderType; - nullsFirst: boolean; -} - export interface UserDefinedFunction { children: ExprNode[]; name: string; @@ -1089,7 +1083,7 @@ function createBaseAggCall(): AggCall { args: [], returnType: undefined, distinct: false, - orderByFields: [], + orderBy: [], filter: undefined, }; } @@ -1101,9 +1095,7 @@ export const AggCall = { args: Array.isArray(object?.args) ? object.args.map((e: any) => InputRef.fromJSON(e)) : [], returnType: isSet(object.returnType) ? DataType.fromJSON(object.returnType) : undefined, distinct: isSet(object.distinct) ? Boolean(object.distinct) : false, - orderByFields: Array.isArray(object?.orderByFields) - ? object.orderByFields.map((e: any) => AggCall_OrderByField.fromJSON(e)) - : [], + orderBy: Array.isArray(object?.orderBy) ? object.orderBy.map((e: any) => ColumnOrder.fromJSON(e)) : [], filter: isSet(object.filter) ? ExprNode.fromJSON(object.filter) : undefined, }; }, @@ -1119,10 +1111,10 @@ export const AggCall = { message.returnType !== undefined && (obj.returnType = message.returnType ? DataType.toJSON(message.returnType) : undefined); message.distinct !== undefined && (obj.distinct = message.distinct); - if (message.orderByFields) { - obj.orderByFields = message.orderByFields.map((e) => e ? AggCall_OrderByField.toJSON(e) : undefined); + if (message.orderBy) { + obj.orderBy = message.orderBy.map((e) => e ? ColumnOrder.toJSON(e) : undefined); } else { - obj.orderByFields = []; + obj.orderBy = []; } message.filter !== undefined && (obj.filter = message.filter ? ExprNode.toJSON(message.filter) : undefined); return obj; @@ -1136,7 +1128,7 @@ export const AggCall = { ? DataType.fromPartial(object.returnType) : undefined; message.distinct = object.distinct ?? false; - message.orderByFields = object.orderByFields?.map((e) => AggCall_OrderByField.fromPartial(e)) || []; + message.orderBy = object.orderBy?.map((e) => ColumnOrder.fromPartial(e)) || []; message.filter = (object.filter !== undefined && object.filter !== null) ? ExprNode.fromPartial(object.filter) : undefined; @@ -1144,36 +1136,6 @@ export const AggCall = { }, }; -function createBaseAggCall_OrderByField(): AggCall_OrderByField { - return { input: 0, direction: OrderType.ORDER_UNSPECIFIED, nullsFirst: false }; -} - -export const AggCall_OrderByField = { - fromJSON(object: any): AggCall_OrderByField { - return { - input: isSet(object.input) ? Number(object.input) : 0, - direction: isSet(object.direction) ? orderTypeFromJSON(object.direction) : OrderType.ORDER_UNSPECIFIED, - nullsFirst: isSet(object.nullsFirst) ? Boolean(object.nullsFirst) : false, - }; - }, - - toJSON(message: AggCall_OrderByField): unknown { - const obj: any = {}; - message.input !== undefined && (obj.input = Math.round(message.input)); - message.direction !== undefined && (obj.direction = orderTypeToJSON(message.direction)); - message.nullsFirst !== undefined && (obj.nullsFirst = message.nullsFirst); - return obj; - }, - - fromPartial, I>>(object: I): AggCall_OrderByField { - const message = createBaseAggCall_OrderByField(); - message.input = object.input ?? 0; - message.direction = object.direction ?? OrderType.ORDER_UNSPECIFIED; - message.nullsFirst = object.nullsFirst ?? false; - return message; - }, -}; - function createBaseUserDefinedFunction(): UserDefinedFunction { return { children: [], name: "", argTypes: [], language: "", link: "", identifier: "" }; } diff --git a/dashboard/proto/gen/order.ts b/dashboard/proto/gen/order.ts new file mode 100644 index 0000000000000..6037394eadcee --- /dev/null +++ b/dashboard/proto/gen/order.ts @@ -0,0 +1,128 @@ +/* eslint-disable */ + +export const protobufPackage = "order"; + +export const PbDirection = { + PbDirection_UNSPECIFIED: "PbDirection_UNSPECIFIED", + PbDirection_ASCENDING: "PbDirection_ASCENDING", + PbDirection_DESCENDING: "PbDirection_DESCENDING", + UNRECOGNIZED: "UNRECOGNIZED", +} as const; + +export type PbDirection = typeof PbDirection[keyof typeof PbDirection]; + +export function pbDirectionFromJSON(object: any): PbDirection { + switch (object) { + case 0: + case "PbDirection_UNSPECIFIED": + return PbDirection.PbDirection_UNSPECIFIED; + case 1: + case "PbDirection_ASCENDING": + return PbDirection.PbDirection_ASCENDING; + case 2: + case "PbDirection_DESCENDING": + return PbDirection.PbDirection_DESCENDING; + case -1: + case "UNRECOGNIZED": + default: + return PbDirection.UNRECOGNIZED; + } +} + +export function pbDirectionToJSON(object: PbDirection): string { + switch (object) { + case PbDirection.PbDirection_UNSPECIFIED: + return "PbDirection_UNSPECIFIED"; + case PbDirection.PbDirection_ASCENDING: + return "PbDirection_ASCENDING"; + case PbDirection.PbDirection_DESCENDING: + return "PbDirection_DESCENDING"; + case PbDirection.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + +export interface PbOrderType { + /** + * TODO(rc): enable `NULLS FIRST | LAST` + * PbNullsAre nulls_are = 2; + */ + direction: PbDirection; +} + +/** Column index with an order type (ASC or DESC). Used to represent a sort key (`repeated PbColumnOrder`). */ +export interface PbColumnOrder { + columnIndex: number; + orderType: PbOrderType | undefined; +} + +function createBasePbOrderType(): PbOrderType { + return { direction: PbDirection.PbDirection_UNSPECIFIED }; +} + +export const PbOrderType = { + fromJSON(object: any): PbOrderType { + return { + direction: isSet(object.direction) ? pbDirectionFromJSON(object.direction) : PbDirection.PbDirection_UNSPECIFIED, + }; + }, + + toJSON(message: PbOrderType): unknown { + const obj: any = {}; + message.direction !== undefined && (obj.direction = pbDirectionToJSON(message.direction)); + return obj; + }, + + fromPartial, I>>(object: I): PbOrderType { + const message = createBasePbOrderType(); + message.direction = object.direction ?? PbDirection.PbDirection_UNSPECIFIED; + return message; + }, +}; + +function createBasePbColumnOrder(): PbColumnOrder { + return { columnIndex: 0, orderType: undefined }; +} + +export const PbColumnOrder = { + fromJSON(object: any): PbColumnOrder { + return { + columnIndex: isSet(object.columnIndex) ? Number(object.columnIndex) : 0, + orderType: isSet(object.orderType) ? PbOrderType.fromJSON(object.orderType) : undefined, + }; + }, + + toJSON(message: PbColumnOrder): unknown { + const obj: any = {}; + message.columnIndex !== undefined && (obj.columnIndex = Math.round(message.columnIndex)); + message.orderType !== undefined && + (obj.orderType = message.orderType ? PbOrderType.toJSON(message.orderType) : undefined); + return obj; + }, + + fromPartial, I>>(object: I): PbColumnOrder { + const message = createBasePbColumnOrder(); + message.columnIndex = object.columnIndex ?? 0; + message.orderType = (object.orderType !== undefined && object.orderType !== null) + ? PbOrderType.fromPartial(object.orderType) + : undefined; + return message; + }, +}; + +type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; + +export type DeepPartial = T extends Builtin ? T + : T extends Array ? Array> : T extends ReadonlyArray ? ReadonlyArray> + : T extends { $case: string } ? { [K in keyof Omit]?: DeepPartial } & { $case: T["$case"] } + : T extends {} ? { [K in keyof T]?: DeepPartial } + : Partial; + +type KeysOfUnion = T extends T ? keyof T : never; +export type Exact = P extends Builtin ? P + : P & { [K in keyof P]: Exact } & { [K in Exclude>]: never }; + +function isSet(value: any): boolean { + return value !== null && value !== undefined; +} diff --git a/dashboard/proto/gen/plan_common.ts b/dashboard/proto/gen/plan_common.ts index 01cf2a88bfde3..6d9f7e3dff50d 100644 --- a/dashboard/proto/gen/plan_common.ts +++ b/dashboard/proto/gen/plan_common.ts @@ -1,4 +1,5 @@ /* eslint-disable */ +import { ColumnOrder } from "./common"; import { DataType } from "./data"; export const protobufPackage = "plan_common"; @@ -84,47 +85,6 @@ export function joinTypeToJSON(object: JoinType): string { } } -export const OrderType = { - ORDER_UNSPECIFIED: "ORDER_UNSPECIFIED", - ASCENDING: "ASCENDING", - DESCENDING: "DESCENDING", - UNRECOGNIZED: "UNRECOGNIZED", -} as const; - -export type OrderType = typeof OrderType[keyof typeof OrderType]; - -export function orderTypeFromJSON(object: any): OrderType { - switch (object) { - case 0: - case "ORDER_UNSPECIFIED": - return OrderType.ORDER_UNSPECIFIED; - case 1: - case "ASCENDING": - return OrderType.ASCENDING; - case 2: - case "DESCENDING": - return OrderType.DESCENDING; - case -1: - case "UNRECOGNIZED": - default: - return OrderType.UNRECOGNIZED; - } -} - -export function orderTypeToJSON(object: OrderType): string { - switch (object) { - case OrderType.ORDER_UNSPECIFIED: - return "ORDER_UNSPECIFIED"; - case OrderType.ASCENDING: - return "ASCENDING"; - case OrderType.DESCENDING: - return "DESCENDING"; - case OrderType.UNRECOGNIZED: - default: - return "UNRECOGNIZED"; - } -} - export const RowFormatType = { ROW_UNSPECIFIED: "ROW_UNSPECIFIED", JSON: "JSON", @@ -260,13 +220,6 @@ export interface StorageTableDesc { readPrefixLenHint: number; } -/** Column index with an order type (ASC or DESC). Used to represent a sort key (`repeated ColumnOrder`). */ -export interface ColumnOrder { - /** maybe other name */ - orderType: OrderType; - index: number; -} - function createBaseField(): Field { return { dataType: undefined, name: "" }; } @@ -435,33 +388,6 @@ export const StorageTableDesc = { }, }; -function createBaseColumnOrder(): ColumnOrder { - return { orderType: OrderType.ORDER_UNSPECIFIED, index: 0 }; -} - -export const ColumnOrder = { - fromJSON(object: any): ColumnOrder { - return { - orderType: isSet(object.orderType) ? orderTypeFromJSON(object.orderType) : OrderType.ORDER_UNSPECIFIED, - index: isSet(object.index) ? Number(object.index) : 0, - }; - }, - - toJSON(message: ColumnOrder): unknown { - const obj: any = {}; - message.orderType !== undefined && (obj.orderType = orderTypeToJSON(message.orderType)); - message.index !== undefined && (obj.index = Math.round(message.index)); - return obj; - }, - - fromPartial, I>>(object: I): ColumnOrder { - const message = createBaseColumnOrder(); - message.orderType = object.orderType ?? OrderType.ORDER_UNSPECIFIED; - message.index = object.index ?? 0; - return message; - }, -}; - type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; export type DeepPartial = T extends Builtin ? T diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index 98fd2b55f6349..1e150e5dc03cd 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -1,12 +1,11 @@ /* eslint-disable */ import { SinkType, sinkTypeFromJSON, sinkTypeToJSON, StreamSourceInfo, Table, WatermarkDesc } from "./catalog"; -import { Buffer } from "./common"; +import { Buffer, ColumnOrder } from "./common"; import { Datum, Epoch, IntervalUnit, StreamChunk } from "./data"; import { AggCall, ExprNode, InputRef, ProjectSetSelectItem } from "./expr"; import { ColumnCatalog, ColumnDesc, - ColumnOrder, Field, JoinType, joinTypeFromJSON, diff --git a/docker/Dockerfile b/docker/Dockerfile index 506e4daf12a0a..eeb2e8a746185 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -33,14 +33,14 @@ RUN rustup self update \ && rustup component add rustfmt RUN cargo fetch -RUN mkdir -p /risingwave/bin/connector-node RUN cargo build -p risingwave_cmd_all --release --features "static-link static-log-level" && \ - mv /risingwave/target/release/risingwave /risingwave/bin/ && \ + mkdir -p /risingwave/bin && mv /risingwave/target/release/risingwave /risingwave/bin/ && \ cargo clean -RUN cd risingwave-connector-node && mvn -B package -Dmaven.test.skip=true -RUN tar -zxvf /risingwave/risingwave-connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node +RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true && \ + mkdir -p /risingwave/bin/connector-node && \ + tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node FROM ubuntu:22.04 as image-base RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk && rm -rf /var/lib/{apt,dpkg,cache,log}/ diff --git a/e2e_test/s3/run.py b/e2e_test/s3/run.py new file mode 100644 index 0000000000000..d90ab16e93db1 --- /dev/null +++ b/e2e_test/s3/run.py @@ -0,0 +1,128 @@ +import os +import string +import json +import string +from time import sleep +from minio import Minio +import psycopg2 +import random + +def do_test(config, N, n, prefix): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE s3_test( + id int, + name TEXT, + sex int, + mark int, + ) WITH ( + connector = 's3', + match_pattern = '{prefix}*.ndjson', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) ROW FORMAT json;''') + + total_row = int(N * n) + sleep(120) + while True: + sleep(60) + cur.execute('select count(*) from s3_test') + result = cur.fetchone() + if result[0] == total_row: + break + print( + f"Now got {result[0]} rows in table, {total_row} expected, wait 60s") + + cur.execute('select count(*), sum(id), sum(sex), sum(mark) from s3_test') + result = cur.fetchone() + + print(result) + + assert result[0] == total_row + assert result[1] == int(((N - 1) * N / 2) * n) + assert result[2] == int(N*n / 2) + assert result[3] == 0 + + cur.execute('drop table s3_test') + + cur.close() + conn.close() + + +if __name__ == "__main__": + config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) + run_id = str(random.randint(1000, 9999)) + N = 10000 + + items = [ + { + "id": j, + "name": str(j), + "sex": j % 2, + "mark": -1 if j % 2 else 1, + } + for j in range(N) + ] + + data = "\n".join([json.dumps(item) for item in items]) + "\n" + n = 0 + with open("data_0.ndjson", "w") as f: + for _ in range(1000): + n += 1 + f.write(data) + os.fsync(f.fileno()) + + for i in range(1, 20): + with open(f"data_{i}.ndjson", "w") as f: + n += 1 + f.write(data) + os.fsync(f.fileno()) + + client = Minio( + config["S3_ENDPOINT"], + access_key=config["S3_ACCESS_KEY"], + secret_key=config["S3_SECRET_KEY"], + secure=True + ) + + for i in range(20): + try: + client.fput_object( + config["S3_BUCKET"], + f"{run_id}_data_{i}.ndjson", + f"data_{i}.ndjson" + + ) + print(f"Uploaded {run_id}_data_{i}.ndjson to S3") + os.remove(f"data_{i}.ndjson") + except Exception as e: + print(f"Error uploading data_{i}.ndjson: {e}") + + return_code = 0 + try: + do_test(config, N, n, run_id) + except Exception as e: + print("Test failed", e) + return_code = 1 + + # Clean up + for i in range(20): + try: + client.remove_object(config["S3_BUCKET"], f"{run_id}_data_{i}.ndjson") + print(f"Removed {run_id}_data_{i}.ndjson from S3") + except Exception as e: + print(f"Error removing data_{i}.ndjson: {e}") + + exit(return_code) diff --git a/e2e_test/streaming/bug_fixes/issue_7698.slt b/e2e_test/streaming/bug_fixes/issue_7698.slt new file mode 100644 index 0000000000000..585a43cf99278 --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_7698.slt @@ -0,0 +1,19 @@ +# https://github.com/risingwavelabs/risingwave/issues/7698 + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t (src int, dst int); + +statement ok +create materialized view cycle_3 as select t1.src p1, t1.dst p2, t2.dst p3 from t t1, t t2, t t3 where t1.dst = t2.src and t2.src = t3.dst and t3.dst = t1.src; + +statement ok +insert into t values (1, 2), (2, 1); + +statement ok +drop materialized view cycle_3; + +statement ok +drop table t; diff --git a/java/connector-node/Dockerfile b/java/connector-node/Dockerfile deleted file mode 100644 index e4d079b57c5d1..0000000000000 --- a/java/connector-node/Dockerfile +++ /dev/null @@ -1,17 +0,0 @@ -# USAGE: `cd && docker build -t -f Dockerfile`. -# There is a built image in `https://github.com/risingwavelabs/risingwave/pkgs/container/risingwave-connector-node`. -# It's not recommended to build it yourself. - -FROM maven:3.8.6-openjdk-11-slim AS builder - -COPY . /code -WORKDIR /code - -RUN mvn --no-transfer-progress -T 8 clean package -Dmaven.test.skip && cd /code/assembly/target \ - && mkdir tar-output && tar xf risingwave-connector-1.0.0.tar.gz -C tar-output - -FROM openjdk:11 - -COPY --from=builder /code/assembly/target/tar-output /risingwave/bin/connector-node - -ENTRYPOINT ["/risingwave/bin/connector-node/start-service.sh"] diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index 2e7c66dd344ae..3ffdda271d647 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -113,18 +113,18 @@ message ValuesNode { } message SortNode { - repeated plan_common.ColumnOrder column_orders = 1; + repeated common.ColumnOrder column_orders = 1; } message TopNNode { - repeated plan_common.ColumnOrder column_orders = 1; + repeated common.ColumnOrder column_orders = 1; uint64 limit = 2; uint64 offset = 3; bool with_ties = 4; } message GroupTopNNode { - repeated plan_common.ColumnOrder column_orders = 1; + repeated common.ColumnOrder column_orders = 1; uint64 limit = 2; uint64 offset = 3; repeated uint32 group_key = 4; @@ -178,7 +178,7 @@ message SortMergeJoinNode { plan_common.JoinType join_type = 1; repeated int32 left_key = 2; repeated int32 right_key = 3; - plan_common.OrderType direction = 4; + common.Direction direction = 4; repeated uint32 output_indices = 5; } @@ -231,7 +231,7 @@ message ExchangeNode { message MergeSortExchangeNode { ExchangeNode exchange = 1; - repeated plan_common.ColumnOrder column_orders = 2; + repeated common.ColumnOrder column_orders = 2; } message LocalLookupJoinNode { diff --git a/proto/catalog.proto b/proto/catalog.proto index d5f3daf86e584..fbd40076fd670 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package catalog; +import "common.proto"; import "data.proto"; import "expr.proto"; import "plan_common.proto"; @@ -72,7 +73,7 @@ message Sink { uint32 database_id = 3; string name = 4; repeated plan_common.ColumnCatalog columns = 5; - repeated plan_common.ColumnOrder pk = 6; + repeated common.ColumnOrder pk = 6; repeated uint32 dependent_relations = 7; repeated int32 distribution_key = 8; // pk_indices of the corresponding materialize operator's output. @@ -134,7 +135,7 @@ message Table { uint32 database_id = 3; string name = 4; repeated plan_common.ColumnCatalog columns = 5; - repeated plan_common.ColumnOrder pk = 6; + repeated common.ColumnOrder pk = 6; repeated uint32 dependent_relations = 8; oneof optional_associated_source_id { uint32 associated_source_id = 9; @@ -161,6 +162,7 @@ message Table { HandleConflictBehavior handle_pk_conflict_behavior = 22; uint32 read_prefix_len_hint = 23; repeated int32 watermark_indices = 24; + repeated int32 dist_key_in_pk = 25; // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. TableVersion version = 100; diff --git a/proto/common.proto b/proto/common.proto index 1cf2e8e594307..546232538b84a 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -75,3 +75,28 @@ message BatchQueryEpoch { uint64 backup = 3; } } + +enum Direction { + DIRECTION_UNSPECIFIED = 0; + DIRECTION_ASCENDING = 1; + DIRECTION_DESCENDING = 2; +} + +// TODO(rc): enable `NULLS FIRST | LAST` +// enum NullsAre { +// NULLS_ARE_UNSPECIFIED = 0; +// NULLS_ARE_SMALLEST = 1; +// NULLS_ARE_LARGEST = 2; +// } + +message OrderType { + Direction direction = 1; + // TODO(rc): enable `NULLS FIRST | LAST` + // NullsAre nulls_are = 2; +} + +// Column index with an order type (ASC or DESC). Used to represent a sort key (`repeated ColumnOrder`). +message ColumnOrder { + uint32 column_index = 1; + OrderType order_type = 2; +} diff --git a/proto/expr.proto b/proto/expr.proto index 0f54a58baf437..e187490cc0324 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -2,8 +2,8 @@ syntax = "proto3"; package expr; +import "common.proto"; import "data.proto"; -import "plan_common.proto"; option java_package = "com.risingwave.proto"; option optimize_for = SPEED; @@ -228,12 +228,7 @@ message AggCall { repeated InputRef args = 2; data.DataType return_type = 3; bool distinct = 4; - message OrderByField { - uint32 input = 1; - plan_common.OrderType direction = 3; - bool nulls_first = 4; - } - repeated OrderByField order_by_fields = 5; + repeated common.ColumnOrder order_by = 5; ExprNode filter = 6; } diff --git a/proto/plan_common.proto b/proto/plan_common.proto index d862a929d3983..b378dd90e5365 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package plan_common; +import "common.proto"; import "data.proto"; option java_package = "com.risingwave.proto"; @@ -36,7 +37,7 @@ message StorageTableDesc { uint32 table_id = 1; repeated ColumnDesc columns = 2; // TODO: may refactor primary key representations - repeated ColumnOrder pk = 3; + repeated common.ColumnOrder pk = 3; repeated uint32 dist_key_indices = 4; uint32 retention_seconds = 5; repeated uint32 value_indices = 6; @@ -57,19 +58,6 @@ enum JoinType { RIGHT_ANTI = 8; } -enum OrderType { - ORDER_UNSPECIFIED = 0; - ASCENDING = 1; - DESCENDING = 2; -} - -// Column index with an order type (ASC or DESC). Used to represent a sort key (`repeated ColumnOrder`). -message ColumnOrder { - // maybe other name - OrderType order_type = 1; - uint32 index = 2; -} - enum RowFormatType { ROW_UNSPECIFIED = 0; JSON = 1; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 91f9b90565348..1d084a3ef3349 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -142,7 +142,7 @@ message SinkDesc { string name = 2; string definition = 3; repeated plan_common.ColumnDesc columns = 4; - repeated plan_common.ColumnOrder pk = 5; + repeated common.ColumnOrder pk = 5; repeated uint32 stream_key = 6; repeated uint32 distribution_key = 7; map properties = 8; @@ -176,7 +176,7 @@ message FilterNode { message MaterializeNode { uint32 table_id = 1; // Column indexes and orders of primary key. - repeated plan_common.ColumnOrder column_orders = 2; + repeated common.ColumnOrder column_orders = 2; // Used for internal table states. catalog.Table table = 3; // Used to handle pk conflict, open it when upstream executor is source executor. @@ -243,7 +243,7 @@ message TopNNode { uint64 limit = 1; uint64 offset = 2; catalog.Table table = 3; - repeated plan_common.ColumnOrder order_by = 4; + repeated common.ColumnOrder order_by = 4; bool with_ties = 5; } @@ -253,7 +253,7 @@ message GroupTopNNode { uint64 offset = 2; repeated uint32 group_key = 3; catalog.Table table = 4; - repeated plan_common.ColumnOrder order_by = 5; + repeated common.ColumnOrder order_by = 5; bool with_ties = 6; } @@ -402,7 +402,7 @@ message BatchPlanNode { message ArrangementInfo { // Order key of the arrangement, including order by columns and pk from the materialize // executor. - repeated plan_common.ColumnOrder arrange_key_orders = 1; + repeated common.ColumnOrder arrange_key_orders = 1; // Column descs of the arrangement repeated plan_common.ColumnDesc column_descs = 2; // Used to build storage table by stream lookup join of delta join. diff --git a/src/batch/benches/hash_agg.rs b/src/batch/benches/hash_agg.rs index 4e2e0ccfc55d6..0e5ec4dd175db 100644 --- a/src/batch/benches/hash_agg.rs +++ b/src/batch/benches/hash_agg.rs @@ -44,7 +44,7 @@ fn create_agg_call( .collect(), return_type: Some(return_type.to_protobuf()), distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: None, } } diff --git a/src/batch/src/executor/group_top_n.rs b/src/batch/src/executor/group_top_n.rs index a4a7fe1b0dbc1..d925ad4e6081f 100644 --- a/src/batch/src/executor/group_top_n.rs +++ b/src/batch/src/executor/group_top_n.rs @@ -101,7 +101,7 @@ impl BoxedExecutorBuilder for GroupTopNExecutorBuilder { let order_pairs = top_n_node .column_orders .iter() - .map(OrderPair::from_prost) + .map(OrderPair::from_protobuf) .collect(); let group_key = top_n_node @@ -295,7 +295,7 @@ mod tests { i i i 4 2 1 3 3 1 - 2 4 1 + 2 4 1 4 3 2 3 4 2 2 5 2 diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index 4faca6c37b4ab..6232e78b470ba 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -319,7 +319,7 @@ mod tests { ..Default::default() }), distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: None, }; @@ -387,7 +387,7 @@ mod tests { ..Default::default() }), distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: None, }; diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index 83992e9f36745..8fe5b768cd871 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -29,7 +29,6 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_expr::expr::{build_from_prost, BoxedExpression}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::BatchQueryEpoch; -use risingwave_pb::plan_common::OrderType as ProstOrderType; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::{Distribution, TableIter}; @@ -187,12 +186,14 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { let order_types: Vec = table_desc .pk .iter() - .map(|order| { - OrderType::from_prost(&ProstOrderType::from_i32(order.order_type).unwrap()) - }) + .map(|order| OrderType::from_protobuf(&order.get_order_type().unwrap().direction())) .collect(); - let pk_indices = table_desc.pk.iter().map(|k| k.index as usize).collect_vec(); + let pk_indices = table_desc + .pk + .iter() + .map(|k| k.column_index as usize) + .collect_vec(); let dist_key_indices = table_desc .dist_key_indices diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 3454032a6f104..97e8278b1c3cf 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -87,7 +87,7 @@ impl InnerSideExecutorBuilder { .table_desc .pk .iter() - .map(|col| col.index as _) + .map(|col| col.column_index as usize) .collect_vec(); let virtual_node = scan_range.try_compute_vnode(&dist_keys, &pk_indices); diff --git a/src/batch/src/executor/merge_sort_exchange.rs b/src/batch/src/executor/merge_sort_exchange.rs index 6e9b47dfa304f..1ca859cae29c8 100644 --- a/src/batch/src/executor/merge_sort_exchange.rs +++ b/src/batch/src/executor/merge_sort_exchange.rs @@ -194,7 +194,7 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder { let order_pairs = sort_merge_node .column_orders .iter() - .map(OrderPair::from_prost) + .map(OrderPair::from_protobuf) .collect(); let order_pairs = Arc::new(order_pairs); diff --git a/src/batch/src/executor/order_by.rs b/src/batch/src/executor/order_by.rs index 6688016ab1c62..d305e1e59ea71 100644 --- a/src/batch/src/executor/order_by.rs +++ b/src/batch/src/executor/order_by.rs @@ -67,7 +67,7 @@ impl BoxedExecutorBuilder for SortExecutor { let order_pairs = order_by_node .column_orders .iter() - .map(OrderPair::from_prost) + .map(OrderPair::from_protobuf) .collect(); Ok(Box::new(SortExecutor::new( child, diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index f2e2f7dcb1c32..460315dc79bf2 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -31,7 +31,7 @@ use risingwave_common::util::value_encoding::deserialize_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{scan_range, ScanRange as ProstScanRange}; use risingwave_pb::common::BatchQueryEpoch; -use risingwave_pb::plan_common::{OrderType as ProstOrderType, StorageTableDesc}; +use risingwave_pb::plan_common::StorageTableDesc; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::{Distribution, TableIter}; @@ -188,17 +188,19 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { let pk_types = table_desc .pk .iter() - .map(|order| column_descs[order.index as usize].clone().data_type) + .map(|order| column_descs[order.column_index as usize].clone().data_type) .collect_vec(); let order_types: Vec = table_desc .pk .iter() - .map(|order| { - OrderType::from_prost(&ProstOrderType::from_i32(order.order_type).unwrap()) - }) + .map(|order| OrderType::from_protobuf(&order.get_order_type().unwrap().direction())) .collect(); - let pk_indices = table_desc.pk.iter().map(|k| k.index as usize).collect_vec(); + let pk_indices = table_desc + .pk + .iter() + .map(|k| k.column_index as usize) + .collect_vec(); let dist_key_indices = table_desc .dist_key_indices diff --git a/src/batch/src/executor/sort_agg.rs b/src/batch/src/executor/sort_agg.rs index e62769deff722..e8fcb784e7b49 100644 --- a/src/batch/src/executor/sort_agg.rs +++ b/src/batch/src/executor/sort_agg.rs @@ -340,7 +340,7 @@ mod tests { ..Default::default() }), distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: None, }; @@ -434,7 +434,7 @@ mod tests { ..Default::default() }), distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: None, }; @@ -563,7 +563,7 @@ mod tests { ..Default::default() }), distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: None, }; @@ -648,7 +648,7 @@ mod tests { ..Default::default() }), distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: None, }; @@ -772,7 +772,7 @@ mod tests { ..Default::default() }), distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: None, }; diff --git a/src/batch/src/executor/top_n.rs b/src/batch/src/executor/top_n.rs index 0ba9b36b53559..a3a557189dc97 100644 --- a/src/batch/src/executor/top_n.rs +++ b/src/batch/src/executor/top_n.rs @@ -60,7 +60,7 @@ impl BoxedExecutorBuilder for TopNExecutor { let order_pairs = top_n_node .column_orders .iter() - .map(OrderPair::from_prost) + .map(OrderPair::from_protobuf) .collect(); Ok(Box::new(Self::new( child, diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 46ba2a6620a47..8500e3099666b 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![allow(incomplete_features)] #![expect(dead_code)] #![allow(clippy::derive_partial_eq_without_eq)] #![feature(trait_alias)] @@ -29,6 +30,7 @@ #![feature(let_chains)] #![feature(bound_map)] #![feature(int_roundings)] +#![feature(async_fn_in_trait)] mod error; pub mod exchange_source; diff --git a/src/batch/src/task/broadcast_channel.rs b/src/batch/src/task/broadcast_channel.rs index b8307c1bc33a9..566c4e789e377 100644 --- a/src/batch/src/task/broadcast_channel.rs +++ b/src/batch/src/task/broadcast_channel.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -use std::future::Future; use risingwave_common::array::DataChunk; use risingwave_common::error::ErrorCode::InternalError; @@ -43,20 +42,16 @@ impl Debug for BroadcastSender { } impl ChanSender for BroadcastSender { - type SendFuture<'a> = impl Future> + 'a; - - fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { - async move { - let broadcast_data_chunk = chunk.map(DataChunkInChannel::new); - for sender in &self.senders { - sender - .send(broadcast_data_chunk.as_ref().cloned()) - .await - .map_err(|_| SenderError)? - } - - Ok(()) + async fn send(&mut self, chunk: Option) -> BatchResult<()> { + let broadcast_data_chunk = chunk.map(DataChunkInChannel::new); + for sender in &self.senders { + sender + .send(broadcast_data_chunk.as_ref().cloned()) + .await + .map_err(|_| SenderError)? } + + Ok(()) } } @@ -66,15 +61,11 @@ pub struct BroadcastReceiver { } impl ChanReceiver for BroadcastReceiver { - type RecvFuture<'a> = impl Future>> + 'a; - - fn recv(&mut self) -> Self::RecvFuture<'_> { - async move { - match self.receiver.recv().await { - Some(data_chunk) => Ok(data_chunk), - // Early close should be treated as an error. - None => Err(InternalError("broken broadcast_channel".to_string()).into()), - } + async fn recv(&mut self) -> Result> { + match self.receiver.recv().await { + Some(data_chunk) => Ok(data_chunk), + // Early close should be treated as an error. + None => Err(InternalError("broken broadcast_channel".to_string()).into()), } } } diff --git a/src/batch/src/task/channel.rs b/src/batch/src/task/channel.rs index 06332f2067a6e..eb598b8d6f5cf 100644 --- a/src/batch/src/task/channel.rs +++ b/src/batch/src/task/channel.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; - use risingwave_common::array::DataChunk; use risingwave_common::error::Result; use risingwave_pb::batch_plan::exchange_info::DistributionMode as ShuffleDistributionMode; @@ -31,13 +29,10 @@ use crate::task::hash_shuffle_channel::{ }; pub(super) trait ChanSender: Send { - type SendFuture<'a>: Future> + Send - where - Self: 'a; /// This function will block until there's enough resource to process the chunk. /// Currently, it will only be called from single thread. /// `None` is sent as a mark of the ending of channel. - fn send(&mut self, chunk: Option) -> Self::SendFuture<'_>; + async fn send(&mut self, chunk: Option) -> BatchResult<()>; } #[derive(Debug, Clone)] @@ -60,12 +55,9 @@ impl ChanSenderImpl { } pub(super) trait ChanReceiver: Send { - type RecvFuture<'a>: Future>> + Send - where - Self: 'a; /// Returns `None` if there's no more data to read. /// Otherwise it will wait until there's data. - fn recv(&mut self) -> Self::RecvFuture<'_>; + async fn recv(&mut self) -> Result>; } pub enum ChanReceiverImpl { diff --git a/src/batch/src/task/consistent_hash_shuffle_channel.rs b/src/batch/src/task/consistent_hash_shuffle_channel.rs index 5ac15257cb074..a45a74a8bb567 100644 --- a/src/batch/src/task/consistent_hash_shuffle_channel.rs +++ b/src/batch/src/task/consistent_hash_shuffle_channel.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -use std::future::Future; use std::ops::BitAnd; use std::option::Option; @@ -108,14 +107,10 @@ fn generate_new_data_chunks( } impl ChanSender for ConsistentHashShuffleSender { - type SendFuture<'a> = impl Future> + 'a; - - fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { - async move { - match chunk { - Some(c) => self.send_chunk(c).await, - None => self.send_done().await, - } + async fn send(&mut self, chunk: Option) -> BatchResult<()> { + match chunk { + Some(c) => self.send_chunk(c).await, + None => self.send_done().await, } } } @@ -153,15 +148,11 @@ impl ConsistentHashShuffleSender { } impl ChanReceiver for ConsistentHashShuffleReceiver { - type RecvFuture<'a> = impl Future>> + 'a; - - fn recv(&mut self) -> Self::RecvFuture<'_> { - async move { - match self.receiver.recv().await { - Some(data_chunk) => Ok(data_chunk), - // Early close should be treated as error. - None => Err(InternalError("broken hash_shuffle_channel".to_string()).into()), - } + async fn recv(&mut self) -> Result> { + match self.receiver.recv().await { + Some(data_chunk) => Ok(data_chunk), + // Early close should be treated as error. + None => Err(InternalError("broken hash_shuffle_channel".to_string()).into()), } } } diff --git a/src/batch/src/task/fifo_channel.rs b/src/batch/src/task/fifo_channel.rs index 3d6ae1fe8a6da..84d2764db6e6b 100644 --- a/src/batch/src/task/fifo_channel.rs +++ b/src/batch/src/task/fifo_channel.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -use std::future::Future; use risingwave_common::array::DataChunk; use risingwave_common::error::ErrorCode::InternalError; @@ -40,28 +39,20 @@ pub struct FifoReceiver { } impl ChanSender for FifoSender { - type SendFuture<'a> = impl Future> + 'a; - - fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { - async { - self.sender - .send(chunk.map(DataChunkInChannel::new)) - .await - .map_err(|_| SenderError) - } + async fn send(&mut self, chunk: Option) -> BatchResult<()> { + self.sender + .send(chunk.map(DataChunkInChannel::new)) + .await + .map_err(|_| SenderError) } } impl ChanReceiver for FifoReceiver { - type RecvFuture<'a> = impl Future>> + 'a; - - fn recv(&mut self) -> Self::RecvFuture<'_> { - async move { - match self.receiver.recv().await { - Some(data_chunk) => Ok(data_chunk), - // Early close should be treated as error. - None => Err(InternalError("broken fifo_channel".to_string()).into()), - } + async fn recv(&mut self) -> Result> { + match self.receiver.recv().await { + Some(data_chunk) => Ok(data_chunk), + // Early close should be treated as error. + None => Err(InternalError("broken fifo_channel".to_string()).into()), } } } diff --git a/src/batch/src/task/hash_shuffle_channel.rs b/src/batch/src/task/hash_shuffle_channel.rs index cb4edbfa3f618..99934cf84d44f 100644 --- a/src/batch/src/task/hash_shuffle_channel.rs +++ b/src/batch/src/task/hash_shuffle_channel.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -use std::future::Future; use std::ops::BitAnd; use std::option::Option; @@ -105,14 +104,10 @@ fn generate_new_data_chunks( } impl ChanSender for HashShuffleSender { - type SendFuture<'a> = impl Future> + 'a; - - fn send(&mut self, chunk: Option) -> Self::SendFuture<'_> { - async move { - match chunk { - Some(c) => self.send_chunk(c).await, - None => self.send_done().await, - } + async fn send(&mut self, chunk: Option) -> BatchResult<()> { + match chunk { + Some(c) => self.send_chunk(c).await, + None => self.send_done().await, } } } @@ -150,15 +145,11 @@ impl HashShuffleSender { } impl ChanReceiver for HashShuffleReceiver { - type RecvFuture<'a> = impl Future>> + 'a; - - fn recv(&mut self) -> Self::RecvFuture<'_> { - async move { - match self.receiver.recv().await { - Some(data_chunk) => Ok(data_chunk), - // Early close should be treated as error. - None => Err(InternalError("broken hash_shuffle_channel".to_string()).into()), - } + async fn recv(&mut self) -> Result> { + match self.receiver.recv().await { + Some(data_chunk) => Ok(data_chunk), + // Early close should be treated as error. + None => Err(InternalError("broken hash_shuffle_channel".to_string()).into()), } } } diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 62063c19e62ce..63235b643c585 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -15,9 +15,7 @@ use std::borrow::Cow; use itertools::Itertools; -use risingwave_pb::plan_common::{ - ColumnCatalog as ProstColumnCatalog, ColumnDesc as ProstColumnDesc, -}; +use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc}; use super::row_id_column_desc; use crate::catalog::{Field, ROW_ID_COLUMN_ID}; @@ -104,8 +102,8 @@ impl ColumnDesc { } /// Convert to proto - pub fn to_protobuf(&self) -> ProstColumnDesc { - ProstColumnDesc { + pub fn to_protobuf(&self) -> PbColumnDesc { + PbColumnDesc { column_type: Some(self.data_type.to_protobuf()), column_id: self.column_id.get_id(), name: self.name.clone(), @@ -199,8 +197,8 @@ impl ColumnDesc { } } -impl From for ColumnDesc { - fn from(prost: ProstColumnDesc) -> Self { +impl From for ColumnDesc { + fn from(prost: PbColumnDesc) -> Self { let field_descs: Vec = prost .field_descs .into_iter() @@ -216,13 +214,13 @@ impl From for ColumnDesc { } } -impl From<&ProstColumnDesc> for ColumnDesc { - fn from(prost: &ProstColumnDesc) -> Self { +impl From<&PbColumnDesc> for ColumnDesc { + fn from(prost: &PbColumnDesc) -> Self { prost.clone().into() } } -impl From<&ColumnDesc> for ProstColumnDesc { +impl From<&ColumnDesc> for PbColumnDesc { fn from(c: &ColumnDesc) -> Self { Self { column_type: c.data_type.to_protobuf().into(), @@ -262,8 +260,8 @@ impl ColumnCatalog { } /// Convert column catalog to proto - pub fn to_protobuf(&self) -> ProstColumnCatalog { - ProstColumnCatalog { + pub fn to_protobuf(&self) -> PbColumnCatalog { + PbColumnCatalog { column_desc: Some(self.column_desc.to_protobuf()), is_hidden: self.is_hidden, } @@ -278,8 +276,8 @@ impl ColumnCatalog { } } -impl From for ColumnCatalog { - fn from(prost: ProstColumnCatalog) -> Self { +impl From for ColumnCatalog { + fn from(prost: PbColumnCatalog) -> Self { Self { column_desc: prost.column_desc.unwrap().into(), is_hidden: prost.is_hidden, @@ -329,22 +327,22 @@ pub fn is_column_ids_dedup(columns: &[ColumnCatalog]) -> bool { #[cfg(test)] pub mod tests { - use risingwave_pb::plan_common::ColumnDesc as ProstColumnDesc; + use risingwave_pb::plan_common::PbColumnDesc; use crate::catalog::ColumnDesc; use crate::test_prelude::*; use crate::types::DataType; - pub fn build_prost_desc() -> ProstColumnDesc { + pub fn build_prost_desc() -> PbColumnDesc { let city = vec![ - ProstColumnDesc::new_atomic(DataType::Varchar.to_protobuf(), "country.city.address", 2), - ProstColumnDesc::new_atomic(DataType::Varchar.to_protobuf(), "country.city.zipcode", 3), + PbColumnDesc::new_atomic(DataType::Varchar.to_protobuf(), "country.city.address", 2), + PbColumnDesc::new_atomic(DataType::Varchar.to_protobuf(), "country.city.zipcode", 3), ]; let country = vec![ - ProstColumnDesc::new_atomic(DataType::Varchar.to_protobuf(), "country.address", 1), - ProstColumnDesc::new_struct("country.city", 4, ".test.City", city), + PbColumnDesc::new_atomic(DataType::Varchar.to_protobuf(), "country.address", 1), + PbColumnDesc::new_struct("country.city", 4, ".test.City", city), ]; - ProstColumnDesc::new_struct("country", 5, ".test.Country", country) + PbColumnDesc::new_struct("country", 5, ".test.Country", country) } pub fn build_desc() -> ColumnDesc { diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index 8ace48915b0bd..c062f68929cc4 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -15,7 +15,8 @@ use std::collections::HashMap; use fixedbitset::FixedBitSet; -use risingwave_pb::plan_common::{ColumnOrder, StorageTableDesc}; +use risingwave_pb::common::PbColumnOrder; +use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; use crate::util::sort_util::OrderPair; @@ -53,7 +54,7 @@ pub struct TableDesc { } impl TableDesc { - pub fn arrange_key_orders_prost(&self) -> Vec { + pub fn arrange_key_orders_protobuf(&self) -> Vec { // Set materialize key as arrange key + pk self.pk.iter().map(|x| x.to_protobuf()).collect() } diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 6e44279fcfe8a..c747d49a1c72c 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -17,10 +17,12 @@ //! [`RwConfig`] corresponds to the whole config file and each other config struct corresponds to a //! section in `risingwave.toml`. +use std::collections::HashMap; use std::fs; use clap::ValueEnum; use serde::{Deserialize, Serialize}; +use serde_json::Value; /// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed /// streams on the same connection. @@ -31,6 +33,36 @@ pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB /// For non-user-facing components where the CLI arguments do not override the config file. pub const NO_OVERRIDE: Option = None; +macro_rules! for_all_config_sections { + ($macro:ident) => { + $macro! { + { server }, + { meta }, + { batch }, + { streaming }, + { storage }, + { storage.file_cache }, + } + }; +} + +macro_rules! impl_warn_unrecognized_fields { + ($({ $($field_path:ident).+ },)*) => { + fn warn_unrecognized_fields(config: &RwConfig) { + if !config.unrecognized.is_empty() { + tracing::warn!("unrecognized fields in config: {:?}", config.unrecognized.keys()); + } + $( + if !config.$($field_path).+.unrecognized.is_empty() { + tracing::warn!("unrecognized fields in config section [{}]: {:?}", stringify!($($field_path).+), config.$($field_path).+.unrecognized.keys()); + } + )* + } + }; +} + +for_all_config_sections!(impl_warn_unrecognized_fields); + pub fn load_config(path: &str, cli_override: Option) -> RwConfig where { @@ -42,10 +74,10 @@ where .unwrap_or_else(|e| panic!("failed to open config file '{}': {}", path, e)); toml::from_str(config_str.as_str()).unwrap_or_else(|e| panic!("parse error {}", e)) }; - // TODO(zhidong): warn deprecated config if let Some(cli_override) = cli_override { cli_override.r#override(&mut config); } + warn_unrecognized_fields(&config); config } @@ -82,6 +114,9 @@ pub struct RwConfig { #[serde(default)] pub backup: BackupConfig, + + #[serde(flatten)] + pub unrecognized: HashMap, } #[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)] @@ -93,7 +128,6 @@ pub enum MetaBackend { /// The section `[meta]` in `risingwave.toml`. #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] pub struct MetaConfig { /// Threshold used by worker node to filter out new SSTs when scanning object store, during /// full SST GC. @@ -150,6 +184,9 @@ pub struct MetaConfig { /// Schedule ttl_reclaim compaction for all compaction groups with this interval. #[serde(default = "default::meta::periodic_ttl_reclaim_compaction_interval_sec")] pub periodic_ttl_reclaim_compaction_interval_sec: u64, + + #[serde(flatten)] + pub unrecognized: HashMap, } impl Default for MetaConfig { @@ -160,7 +197,6 @@ impl Default for MetaConfig { /// The section `[server]` in `risingwave.toml`. #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] pub struct ServerConfig { /// The interval for periodic heartbeat from worker to the meta service. #[serde(default = "default::server::heartbeat_interval_ms")] @@ -178,6 +214,9 @@ pub struct ServerConfig { /// 0 = close metrics /// >0 = open metrics pub metrics_level: u32, + + #[serde(flatten)] + pub unrecognized: HashMap, } impl Default for ServerConfig { @@ -188,7 +227,6 @@ impl Default for ServerConfig { /// The section `[batch]` in `risingwave.toml`. #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] pub struct BatchConfig { /// The thread number of the batch task runtime in the compute node. The default value is /// decided by `tokio`. @@ -200,6 +238,9 @@ pub struct BatchConfig { #[serde(default)] pub distributed_query_limit: Option, + + #[serde(flatten)] + pub unrecognized: HashMap, } impl Default for BatchConfig { @@ -210,7 +251,6 @@ impl Default for BatchConfig { /// The section `[streaming]` in `risingwave.toml`. #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] pub struct StreamingConfig { /// The interval of periodic barrier. #[serde(default = "default::streaming::barrier_interval_ms")] @@ -243,6 +283,9 @@ pub struct StreamingConfig { /// Max unique user stream errors per actor #[serde(default = "default::streaming::unique_user_stream_errors")] pub unique_user_stream_errors: usize, + + #[serde(flatten)] + pub unrecognized: HashMap, } impl Default for StreamingConfig { @@ -253,7 +296,6 @@ impl Default for StreamingConfig { /// The section `[storage]` in `risingwave.toml`. #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] pub struct StorageConfig { // TODO(zhidong): Remove in 0.1.18 release // NOTE: It is now a system parameter and should not be used directly. @@ -340,6 +382,9 @@ pub struct StorageConfig { #[serde(default = "default::storage::max_concurrent_compaction_task_number")] pub max_concurrent_compaction_task_number: u64, + + #[serde(flatten)] + pub unrecognized: HashMap, } impl Default for StorageConfig { @@ -352,7 +397,6 @@ impl Default for StorageConfig { /// /// It's put at [`StorageConfig::file_cache`]. #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] pub struct FileCacheConfig { #[serde(default = "default::file_cache::dir")] pub dir: String, @@ -371,6 +415,9 @@ pub struct FileCacheConfig { #[serde(default = "default::file_cache::cache_file_max_write_size_mb")] pub cache_file_max_write_size_mb: usize, + + #[serde(flatten)] + pub unrecognized: HashMap, } impl Default for FileCacheConfig { @@ -391,7 +438,6 @@ pub enum AsyncStackTraceOption { /// /// It is put at [`BatchConfig::developer`] and [`StreamingConfig::developer`]. #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] pub struct DeveloperConfig { /// The size of the channel used for output to exchange/shuffle. #[serde(default = "default::developer::batch_output_channel_size")] @@ -429,6 +475,9 @@ pub struct DeveloperConfig { /// in remote exchange. #[serde(default = "default::developer::stream_exchange_batched_permits")] pub stream_exchange_batched_permits: usize, + + #[serde(flatten)] + pub unrecognized: HashMap, } impl Default for DeveloperConfig { @@ -439,7 +488,6 @@ impl Default for DeveloperConfig { /// Configs for meta node backup #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] pub struct BackupConfig { // TODO: Remove in 0.1.18 release // NOTE: It is now a system parameter and should not be used directly. @@ -451,6 +499,9 @@ pub struct BackupConfig { /// Remote directory for storing snapshots. #[serde(default = "default::backup::storage_directory")] pub storage_directory: String, + + #[serde(flatten)] + pub unrecognized: HashMap, } impl Default for BackupConfig { diff --git a/src/common/src/util/sort_util.rs b/src/common/src/util/sort_util.rs index 146eae6e85476..234a7c39261a2 100644 --- a/src/common/src/util/sort_util.rs +++ b/src/common/src/util/sort_util.rs @@ -15,7 +15,7 @@ use std::cmp::{Ord, Ordering}; use std::sync::Arc; -use risingwave_pb::plan_common::{ColumnOrder, OrderType as ProstOrderType}; +use risingwave_pb::common::{PbColumnOrder, PbDirection, PbOrderType}; use crate::array::{Array, ArrayImpl, DataChunk}; use crate::error::ErrorCode::InternalError; @@ -28,25 +28,27 @@ pub enum OrderType { } impl OrderType { - pub fn from_prost(order_type: &ProstOrderType) -> OrderType { + // TODO(rc): from `PbOrderType` + pub fn from_protobuf(order_type: &PbDirection) -> OrderType { match order_type { - ProstOrderType::Ascending => OrderType::Ascending, - ProstOrderType::Descending => OrderType::Descending, - ProstOrderType::OrderUnspecified => unreachable!(), + PbDirection::Ascending => OrderType::Ascending, + PbDirection::Descending => OrderType::Descending, + PbDirection::Unspecified => unreachable!(), } } - pub fn to_prost(self) -> ProstOrderType { + // TODO(rc): to `PbOrderType` + pub fn to_protobuf(self) -> PbDirection { match self { - OrderType::Ascending => ProstOrderType::Ascending, - OrderType::Descending => ProstOrderType::Descending, + OrderType::Ascending => PbDirection::Ascending, + OrderType::Descending => PbDirection::Descending, } } } /// Column index with an order type (ASC or DESC). Used to represent a sort key (`Vec`). /// -/// Corresponds to protobuf [`ColumnOrder`]. +/// Corresponds to protobuf [`PbColumnOrder`]. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OrderPair { pub column_idx: usize, @@ -61,18 +63,21 @@ impl OrderPair { } } - pub fn from_prost(column_order: &ColumnOrder) -> Self { - let order_type: ProstOrderType = ProstOrderType::from_i32(column_order.order_type).unwrap(); + pub fn from_protobuf(column_order: &PbColumnOrder) -> Self { OrderPair { - order_type: OrderType::from_prost(&order_type), - column_idx: column_order.index as usize, + column_idx: column_order.column_index as _, + order_type: OrderType::from_protobuf( + &column_order.get_order_type().unwrap().direction(), + ), } } - pub fn to_protobuf(&self) -> ColumnOrder { - ColumnOrder { - order_type: self.order_type.to_prost() as i32, - index: self.column_idx as u32, + pub fn to_protobuf(&self) -> PbColumnOrder { + PbColumnOrder { + column_index: self.column_idx as _, + order_type: Some(PbOrderType { + direction: self.order_type.to_protobuf() as _, + }), } } } diff --git a/src/common/src/util/value_encoding/column_aware_row_encoding.rs b/src/common/src/util/value_encoding/column_aware_row_encoding.rs index 5f41316169b83..34cf194b3177a 100644 --- a/src/common/src/util/value_encoding/column_aware_row_encoding.rs +++ b/src/common/src/util/value_encoding/column_aware_row_encoding.rs @@ -75,13 +75,13 @@ impl RowEncoding { self.flag |= Flag::OFFSET16; usize_offsets .iter() - .for_each(|m| self.offsets.put_u16(*m as u16)); + .for_each(|m| self.offsets.put_u16_le(*m as u16)); } _n @ ..=const { u32::MAX as usize } => { self.flag |= Flag::OFFSET32; usize_offsets .iter() - .for_each(|m| self.offsets.put_u32(*m as u32)); + .for_each(|m| self.offsets.put_u32_le(*m as u32)); } _ => unreachable!("encoding length exceeds u32"), } @@ -340,4 +340,43 @@ mod tests { vec![Some(Int16(5)), Some(Utf8("abc".into()))] ); } + #[test] + fn test_row_hard1() { + let column_ids = (0..20000).map(ColumnId::new).collect_vec(); + let row = OwnedRow::new(vec![Some(Int16(233)); 20000]); + let data_types = vec![DataType::Int16; 20000]; + let serde = ColumnAwareSerde::new(&column_ids, Arc::from(data_types.into_boxed_slice())); + let encoded_bytes = serde.serialize(row); + let decoded_row = serde.deserialize(&encoded_bytes); + assert_eq!(decoded_row.unwrap(), vec![Some(Int16(233)); 20000]); + } + #[test] + fn test_row_hard2() { + let column_ids = (0..20000).map(ColumnId::new).collect_vec(); + let mut data = vec![Some(Int16(233)); 5000]; + data.extend(vec![None; 5000]); + data.extend(vec![Some(Utf8("risingwave risingwave".into())); 5000]); + data.extend(vec![None; 5000]); + let row = OwnedRow::new(data.clone()); + let mut data_types = vec![DataType::Int16; 10000]; + data_types.extend(vec![DataType::Varchar; 10000]); + let serde = ColumnAwareSerde::new(&column_ids, Arc::from(data_types.into_boxed_slice())); + let encoded_bytes = serde.serialize(row); + let decoded_row = serde.deserialize(&encoded_bytes); + assert_eq!(decoded_row.unwrap(), data); + } + #[test] + fn test_row_hard3() { + let column_ids = (0..1000000).map(ColumnId::new).collect_vec(); + let mut data = vec![Some(Int64(233)); 500000]; + data.extend(vec![None; 250000]); + data.extend(vec![Some(Utf8("risingwave risingwave".into())); 250000]); + let row = OwnedRow::new(data.clone()); + let mut data_types = vec![DataType::Int64; 500000]; + data_types.extend(vec![DataType::Varchar; 500000]); + let serde = ColumnAwareSerde::new(&column_ids, Arc::from(data_types.into_boxed_slice())); + let encoded_bytes = serde.serialize(row); + let decoded_row = serde.deserialize(&encoded_bytes); + assert_eq!(decoded_row.unwrap(), data); + } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index eae14523cb174..1e663b9c8240d 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -181,7 +181,7 @@ impl From for SinkCatalog { .into_iter() .map(ColumnCatalog::from) .collect_vec(), - pk: pb.pk.iter().map(OrderPair::from_prost).collect_vec(), + pk: pb.pk.iter().map(OrderPair::from_protobuf).collect_vec(), stream_key: pb.stream_key.iter().map(|k| *k as _).collect_vec(), distribution_key: pb.distribution_key.iter().map(|k| *k as _).collect_vec(), properties: pb.properties.clone(), diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 40570c4d17fec..17f60fc6d75fc 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -15,19 +15,23 @@ use std::collections::HashMap; use bytes::{Buf, Bytes}; +use chrono::offset::Utc; +use chrono::DateTime; +use clap::Args; use itertools::Itertools; use risingwave_common::row::{Row, RowDeserializer}; use risingwave_common::types::to_text::ToText; +use risingwave_common::util::epoch::Epoch; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_frontend::TableCatalog; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt; use risingwave_hummock_sdk::key::FullKey; -use risingwave_hummock_sdk::HummockSstableId; use risingwave_object_store::object::BlockLocation; +use risingwave_pb::hummock::{Level, SstableInfo}; use risingwave_rpc_client::MetaClient; use risingwave_storage::hummock::value::HummockValue; use risingwave_storage::hummock::{ - Block, BlockHolder, BlockIterator, CompressionAlgorithm, SstableMeta, SstableStore, + Block, BlockHolder, BlockIterator, CompressionAlgorithm, Sstable, SstableStore, }; use risingwave_storage::monitor::StoreLocalStatistic; @@ -35,7 +39,28 @@ use crate::CtlContext; type TableData = HashMap; -pub async fn sst_dump(context: &CtlContext) -> anyhow::Result<()> { +#[derive(Args, Debug)] +pub struct SstDumpArgs { + #[clap(short, long = "sst-id")] + sst_id: Option, + #[clap(short, long = "block-id")] + block_id: Option, + #[clap(short = 'p', long = "print-entries")] + print_entries: bool, + #[clap(short = 'l', long = "print-level-info")] + print_level: bool, +} + +fn print_level(level: &Level) { + println!("Level Type: {}", level.level_type); + println!("Level Idx: {}", level.level_idx); + if level.level_idx == 0 { + println!("L0 Sub-Level Idx: {}", level.sub_level_id); + } +} + +pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result<()> { + println!("Start sst dump with args: {:?}", args); // Retrieves the Sstable store so we can access the SstableMeta let meta_client = context.meta_client().await?; let hummock = context.hummock_store().await?; @@ -43,37 +68,91 @@ pub async fn sst_dump(context: &CtlContext) -> anyhow::Result<()> { let table_data = load_table_schemas(&meta_client).await?; let sstable_store = &*hummock.sstable_store(); + + // TODO: We can avoid reading meta if `print_level` is false with the new block format. for level in version.get_combined_levels() { for sstable_info in &level.table_infos { - let id = sstable_info.id; - - let sstable_cache = sstable_store - .sstable(sstable_info, &mut StoreLocalStatistic::default()) - .await?; - let sstable = sstable_cache.value().as_ref(); - let sstable_meta = &sstable.meta; - - println!("SST id: {}", id); - println!("-------------------------------------"); - println!("Level: {}", level.level_type); - println!("File Size: {}", sstable_info.file_size); - - if let Some(key_range) = sstable_info.key_range.as_ref() { - println!("Key Range:"); - println!( - "\tleft:\t{:?}\n\tright:\t{:?}\n\t", - key_range.left, key_range.right, - ); + if let Some(sst_id) = &args.sst_id { + if *sst_id == sstable_info.id { + if args.print_level { + print_level(level); + } + + sst_dump_via_sstable_store( + sstable_store, + sstable_info.id, + sstable_info.meta_offset, + sstable_info.file_size, + &table_data, + &args, + ) + .await?; + return Ok(()); + } } else { - println!("Key Range: None"); + if args.print_level { + print_level(level); + } + + sst_dump_via_sstable_store( + sstable_store, + sstable_info.id, + sstable_info.meta_offset, + sstable_info.file_size, + &table_data, + &args, + ) + .await?; } + } + } + Ok(()) +} - println!("Estimated Table Size: {}", sstable_meta.estimated_size); - println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len()); - println!("Key Count: {}", sstable_meta.key_count); - println!("Version: {}", sstable_meta.version); - - print_blocks(id, &table_data, sstable_store, sstable_meta).await?; +pub async fn sst_dump_via_sstable_store( + sstable_store: &SstableStore, + sst_id: u64, + meta_offset: u64, + file_size: u64, + table_data: &TableData, + args: &SstDumpArgs, +) -> anyhow::Result<()> { + let sstable_info = SstableInfo { + id: sst_id, + meta_offset, + file_size, + ..Default::default() + }; + let sstable_cache = sstable_store + .sstable(&sstable_info, &mut StoreLocalStatistic::default()) + .await?; + let sstable = sstable_cache.value().as_ref(); + let sstable_meta = &sstable.meta; + + println!("SST id: {}", sst_id); + println!("-------------------------------------"); + println!("File Size: {}", sstable.estimate_size()); + + println!("Key Range:"); + println!( + "\tleft:\t{:?}\n\tright:\t{:?}\n\t", + sstable_meta.smallest_key, sstable_meta.largest_key, + ); + + println!("Estimated Table Size: {}", sstable_meta.estimated_size); + println!("Bloom Filter Size: {}", sstable_meta.bloom_filter.len()); + println!("Key Count: {}", sstable_meta.key_count); + println!("Version: {}", sstable_meta.version); + + println!("SST Block Count: {}", sstable.block_count()); + for i in 0..sstable.block_count() { + if let Some(block_id) = &args.block_id { + if *block_id == i as u64 { + print_block(i, table_data, sstable_store, sstable, args).await?; + return Ok(()); + } + } else { + print_block(i, table_data, sstable_store, sstable, args).await?; } } Ok(()) @@ -92,38 +171,39 @@ async fn load_table_schemas(meta_client: &MetaClient) -> anyhow::Result anyhow::Result<()> { - let data_path = sstable_store.get_sst_data_path(id); - - println!("Blocks:"); - for (i, block_meta) in sstable_meta.block_metas.iter().enumerate() { - println!("\tBlock {}", i); - println!("\t-----------"); - - // Retrieve encoded block data in bytes - let store = sstable_store.store(); - let block_loc = BlockLocation { - offset: block_meta.offset as usize, - size: block_meta.len as usize, - }; - let block_data = store.read(&data_path, Some(block_loc)).await?; + println!("\tBlock {}", block_idx); + println!("\t-----------"); - // Retrieve checksum and compression algorithm used from the encoded block data - let len = block_data.len(); - let checksum = (&block_data[len - 8..]).get_u64_le(); - let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?; + let block_meta = &sst.meta.block_metas[block_idx]; + let data_path = sstable_store.get_sst_data_path(sst.id); - println!( - "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}", - block_meta.offset, block_meta.len, checksum, compression - ); + // Retrieve encoded block data in bytes + let store = sstable_store.store(); + let block_loc = BlockLocation { + offset: block_meta.offset as usize, + size: block_meta.len as usize, + }; + let block_data = store.read(&data_path, Some(block_loc)).await?; + + // Retrieve checksum and compression algorithm used from the encoded block data + let len = block_data.len(); + let checksum = (&block_data[len - 8..]).get_u64_le(); + let compression = CompressionAlgorithm::decode(&mut &block_data[len - 9..len - 8])?; + println!( + "\tOffset: {}, Size: {}, Checksum: {}, Compression Algorithm: {:?}", + block_meta.offset, block_meta.len, checksum, compression + ); + + if args.print_entries { print_kv_pairs( block_data, table_data, @@ -159,13 +239,18 @@ fn print_kv_pairs( HummockValue::Delete => (false, &[] as &[u8]), }; - let epoch = full_key.epoch; + let epoch = Epoch::from(full_key.epoch); + let date_time = DateTime::::from(epoch.as_system_time()); - println!("\t\t full key: {:02x?}", raw_full_key); - println!("\t\tfull value: {:02x?}", full_val); + println!( + "\t\t full key: {:02x?}, len={}", + raw_full_key, + raw_full_key.len() + ); + println!("\t\tfull value: {:02x?}, len={}", full_val, full_val.len()); println!("\t\t user key: {:02x?}", raw_user_key); println!("\t\tuser value: {:02x?}", user_val); - println!("\t\t epoch: {}", epoch); + println!("\t\t epoch: {} ({})", epoch, date_time); println!("\t\t type: {}", if is_put { "Put" } else { "Delete" }); print_table_column(full_key, user_val, table_data, is_put)?; diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index fbc214b919bef..7e4e10d70ca1f 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -15,6 +15,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use cmd_impl::bench::BenchCommands; +use cmd_impl::hummock::SstDumpArgs; use crate::cmd_impl::hummock::{ build_compaction_config_vec, list_pinned_snapshots, list_pinned_versions, @@ -95,7 +96,7 @@ enum HummockCommands { #[clap(short, long = "table-id")] table_id: u32, }, - SstDump, + SstDump(SstDumpArgs), /// trigger a targeted compaction through compaction_group_id TriggerManualCompaction { #[clap(short, long = "compaction-group-id", default_value_t = 2)] @@ -229,8 +230,8 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { Commands::Hummock(HummockCommands::ListKv { epoch, table_id }) => { cmd_impl::hummock::list_kv(context, epoch, table_id).await?; } - Commands::Hummock(HummockCommands::SstDump) => { - cmd_impl::hummock::sst_dump(context).await.unwrap() + Commands::Hummock(HummockCommands::SstDump(args)) => { + cmd_impl::hummock::sst_dump(context, args).await.unwrap() } Commands::Hummock(HummockCommands::TriggerManualCompaction { compaction_group_id, diff --git a/src/expr/src/vector_op/agg/aggregator.rs b/src/expr/src/vector_op/agg/aggregator.rs index b9b1fe3d6802f..5e932dfe0d806 100644 --- a/src/expr/src/vector_op/agg/aggregator.rs +++ b/src/expr/src/vector_op/agg/aggregator.rs @@ -20,7 +20,6 @@ use risingwave_common::bail; use risingwave_common::types::*; use risingwave_common::util::sort_util::{OrderPair, OrderType}; use risingwave_pb::expr::AggCall; -use risingwave_pb::plan_common::OrderType as ProstOrderType; use crate::expr::{build_from_prost, AggKind}; use crate::vector_op::agg::approx_count_distinct::ApproxCountDistinct; @@ -72,12 +71,12 @@ impl AggStateFactory { let agg_kind = AggKind::try_from(prost.get_type()?)?; let distinct = prost.distinct; let order_pairs = prost - .get_order_by_fields() + .get_order_by() .iter() - .map(|field| { - let col_idx = field.get_input() as usize; + .map(|col_order| { + let col_idx = col_order.get_column_index() as usize; let order_type = - OrderType::from_prost(&ProstOrderType::from_i32(field.direction).unwrap()); + OrderType::from_protobuf(&col_order.get_order_type().unwrap().direction()); // TODO(yuchao): `nulls first/last` is not supported yet, so it's ignore here, // see also `risingwave_common::util::sort_util::compare_values` OrderPair::new(col_idx, order_type) diff --git a/src/frontend/planner_test/tests/testdata/join.yaml b/src/frontend/planner_test/tests/testdata/join.yaml index dfcd3fc78aebc..e05ea2fd9f635 100644 --- a/src/frontend/planner_test/tests/testdata/join.yaml +++ b/src/frontend/planner_test/tests/testdata/join.yaml @@ -621,3 +621,57 @@ | └─StreamTableScan { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } └─StreamExchange { dist: HashShard(t.x) } └─StreamTableScan { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- name: Choose correct distribution key in pk (https://github.com/risingwavelabs/risingwave/issues/7698) + sql: | + create table t (src int, dst int); + select t1.src p1, t1.dst p2, t2.dst p3 from t t1, t t2, t t3 where t1.dst = t2.src and t2.src = t3.dst and t3.dst = t1.src; + stream_plan: | + StreamMaterialize { columns: [p1, p2, p3, t._row_id(hidden), t._row_id#1(hidden), t.src(hidden), t._row_id#2(hidden)], pk_columns: [t._row_id, t._row_id#1, p2, t._row_id#2, t.src, p1], pk_conflict: "no check" } + └─StreamHashJoin { type: Inner, predicate: t.src = t.dst AND t.src = t.dst, output: [t.src, t.dst, t.dst, t._row_id, t._row_id, t.src, t._row_id] } + ├─StreamExchange { dist: HashShard(t.src) } + | └─StreamHashJoin { type: Inner, predicate: t.dst = t.src, output: [t.src, t.dst, t.src, t.dst, t._row_id, t._row_id] } + | ├─StreamExchange { dist: HashShard(t.dst) } + | | └─StreamTableScan { table: t, columns: [t.src, t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + | └─StreamExchange { dist: HashShard(t.src) } + | └─StreamTableScan { table: t, columns: [t.src, t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamExchange { dist: HashShard(t.dst) } + └─StreamTableScan { table: t, columns: [t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + stream_dist_plan: | + Fragment 0 + StreamMaterialize { columns: [p1, p2, p3, t._row_id(hidden), t._row_id#1(hidden), t.src(hidden), t._row_id#2(hidden)], pk_columns: [t._row_id, t._row_id#1, p2, t._row_id#2, t.src, p1], pk_conflict: "no check" } + materialized table: 4294967294 + StreamHashJoin { type: Inner, predicate: t.src = t.dst AND t.src = t.dst, output: [t.src, t.dst, t.dst, t._row_id, t._row_id, t.src, t._row_id] } + left table: 0, right table 2, left degree table: 1, right degree table: 3, + StreamExchange Hash([0]) from 1 + StreamExchange Hash([0]) from 4 + + Fragment 1 + StreamHashJoin { type: Inner, predicate: t.dst = t.src, output: [t.src, t.dst, t.src, t.dst, t._row_id, t._row_id] } + left table: 4, right table 6, left degree table: 5, right degree table: 7, + StreamExchange Hash([1]) from 2 + StreamExchange Hash([0]) from 3 + + Fragment 2 + Chain { table: t, columns: [t.src, t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + Upstream + BatchPlanNode + + Fragment 3 + Chain { table: t, columns: [t.src, t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + Upstream + BatchPlanNode + + Fragment 4 + Chain { table: t, columns: [t.dst, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + Upstream + BatchPlanNode + + Table 0 { columns: [t_src, t_dst, t_src_0, t_dst_0, t__row_id, t__row_id_0], primary key: [$2 ASC, $0 ASC, $4 ASC, $5 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [0] } + Table 1 { columns: [t_src, t_src_0, t__row_id, t__row_id_0, t_dst, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC], value indices: [5], distribution key: [1] } + Table 2 { columns: [t_dst, t__row_id], primary key: [$0 ASC, $0 ASC, $1 ASC], value indices: [0, 1], distribution key: [0] } + Table 3 { columns: [t_dst, t_dst_0, t__row_id, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [1] } + Table 4 { columns: [t_src, t_dst, t__row_id], primary key: [$1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [1] } + Table 5 { columns: [t_dst, t__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } + Table 6 { columns: [t_src, t_dst, t__row_id], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] } + Table 7 { columns: [t_src, t__row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] } + Table 4294967294 { columns: [p1, p2, p3, t._row_id, t._row_id#1, t.src, t._row_id#2], primary key: [$3 ASC, $4 ASC, $1 ASC, $6 ASC, $5 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [0] } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index a037767ba9a13..b5b2664954e53 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -122,8 +122,12 @@ pub struct TableCatalog { /// Per-table catalog version, used by schema change. `None` for internal tables and tests. pub version: Option, - /// the column indices which could receive watermarks. + /// The column indices which could receive watermarks. pub watermark_columns: FixedBitSet, + + /// Optional field specifies the distribution key indices in pk. + /// See https://github.com/risingwavelabs/risingwave/issues/8377 for more information. + pub dist_key_in_pk: Vec, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -363,6 +367,7 @@ impl TableCatalog { read_prefix_len_hint: self.read_prefix_len_hint as u32, version: self.version.as_ref().map(TableVersion::to_prost), watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(), + dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(), handle_pk_conflict_behavior: self.conflict_behavior_type, } } @@ -422,6 +427,7 @@ impl From for TableCatalog { read_prefix_len_hint: tb.read_prefix_len_hint as usize, version: tb.version.map(TableVersion::from_prost), watermark_columns, + dist_key_in_pk: tb.dist_key_in_pk.iter().map(|x| *x as _).collect(), } } } @@ -520,6 +526,7 @@ mod tests { }), watermark_indices: vec![], handle_pk_conflict_behavior: 0, + dist_key_in_pk: vec![], } .into(); @@ -582,6 +589,7 @@ mod tests { read_prefix_len_hint: 0, version: Some(TableVersion::new_initial_for_test(ColumnId::new(1))), watermark_columns: FixedBitSet::with_capacity(2), + dist_key_in_pk: vec![], } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 845a67c72acaf..5f43f535ecfd0 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -20,7 +20,7 @@ use risingwave_common::catalog::{Field, FieldDisplay, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use risingwave_expr::expr::AggKind; -use risingwave_pb::expr::agg_call::OrderByField as ProstAggOrderByField; +use risingwave_pb::common::{PbColumnOrder, PbOrderType}; use risingwave_pb::expr::AggCall as ProstAggCall; use risingwave_pb::stream_plan::{agg_call_state, AggCallState as AggCallStateProst}; @@ -278,7 +278,7 @@ impl Agg { vec![(OrderType::Descending, agg_call.inputs[0].index)] } AggKind::StringAgg | AggKind::ArrayAgg => agg_call - .order_by_fields + .order_by .iter() .map(|o| (o.direction.to_order(), o.input.index)) .collect(), @@ -521,11 +521,12 @@ impl fmt::Display for PlanAggOrderByFieldDisplay<'_> { } impl PlanAggOrderByField { - fn to_protobuf(&self) -> ProstAggOrderByField { - ProstAggOrderByField { - input: self.input.index() as _, - direction: self.direction.to_protobuf() as i32, - nulls_first: self.nulls_first, + fn to_protobuf(&self) -> PbColumnOrder { + PbColumnOrder { + column_index: self.input.index() as _, + order_type: Some(PbOrderType { + direction: self.direction.to_protobuf() as _, + }), } } } @@ -551,7 +552,7 @@ pub struct PlanAggCall { pub inputs: Vec, pub distinct: bool, - pub order_by_fields: Vec, + pub order_by: Vec, /// Selective aggregation: only the input rows for which /// `filter` evaluates to `true` will be fed to the aggregate function. pub filter: Condition, @@ -571,12 +572,8 @@ impl fmt::Debug for PlanAggCall { write!(f, ",")?; } } - if !self.order_by_fields.is_empty() { - let clause_text = self - .order_by_fields - .iter() - .map(|e| format!("{:?}", e)) - .join(", "); + if !self.order_by.is_empty() { + let clause_text = self.order_by.iter().map(|e| format!("{:?}", e)).join(", "); write!(f, " order_by({})", clause_text)?; } write!(f, ")")?; @@ -599,8 +596,8 @@ impl PlanAggCall { x.index = mapping.map(x.index); }); - // modify order_by_fields - self.order_by_fields.iter_mut().for_each(|x| { + // modify order_by exprs + self.order_by.iter_mut().for_each(|x| { x.input.index = mapping.map(x.input.index); }); @@ -617,8 +614,8 @@ impl PlanAggCall { return_type: Some(self.return_type.to_protobuf()), args: self.inputs.iter().map(InputRef::to_proto).collect(), distinct: self.distinct, - order_by_fields: self - .order_by_fields + order_by: self + .order_by .iter() .map(PlanAggOrderByField::to_protobuf) .collect(), @@ -644,7 +641,7 @@ impl PlanAggCall { PlanAggCall { agg_kind: total_agg_kind, inputs: vec![InputRef::new(partial_output_idx, self.return_type.clone())], - order_by_fields: vec![], // order must make no difference when we use 2-phase agg + order_by: vec![], // order must make no difference when we use 2-phase agg filter: Condition::true_cond(), ..self.clone() } @@ -656,7 +653,7 @@ impl PlanAggCall { return_type: DataType::Int64, inputs: vec![], distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: Condition::true_cond(), } } @@ -698,11 +695,11 @@ impl fmt::Debug for PlanAggCallDisplay<'_> { write!(f, ", ")?; } } - if !that.order_by_fields.is_empty() { + if !that.order_by.is_empty() { write!( f, " order_by({})", - that.order_by_fields.iter().format_with(", ", |e, f| { + that.order_by.iter().format_with(", ", |e, f| { f(&PlanAggOrderByFieldDisplay { plan_agg_order_by_field: e, input_schema: self.input_schema, diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index d356541a22db6..db92c173e2379 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -324,8 +324,6 @@ impl LogicalAgg { call.agg_kind, AggKind::Min | AggKind::Max | AggKind::Sum | AggKind::Count ) && !call.distinct - // QUESTION: why do we need `&& call.order_by_fields.is_empty()` ? - // && call.order_by_fields.is_empty() }) && !self.is_agg_result_affected_by_order() && self.two_phase_agg_enabled() @@ -480,7 +478,7 @@ impl LogicalAggBuilder { if agg_call.distinct { has_distinct = true; } - if !agg_call.order_by_fields.is_empty() { + if !agg_call.order_by.is_empty() { has_order_by = true; } if !agg_call.distinct && agg_call.agg_kind == AggKind::StringAgg { @@ -591,7 +589,7 @@ impl LogicalAggBuilder { ErrorCode::NotImplemented(format!("{err} inside aggregation calls"), None.into()) })?; - let order_by_fields: Vec<_> = order_by + let order_by: Vec<_> = order_by .sort_exprs .iter() .map(|e| { @@ -622,7 +620,7 @@ impl LogicalAggBuilder { return_type: left_return_type, inputs: inputs.clone(), distinct, - order_by_fields: order_by_fields.clone(), + order_by: order_by.clone(), filter: filter.clone(), }); let left = ExprImpl::from(left_ref).cast_implicit(return_type).unwrap(); @@ -635,7 +633,7 @@ impl LogicalAggBuilder { return_type: right_return_type, inputs, distinct, - order_by_fields, + order_by, filter, }); @@ -681,7 +679,7 @@ impl LogicalAggBuilder { squared_input_expr.return_type(), )], distinct, - order_by_fields: order_by_fields.clone(), + order_by: order_by.clone(), filter: filter.clone(), })) .cast_implicit(return_type.clone()) @@ -696,7 +694,7 @@ impl LogicalAggBuilder { return_type: sum_return_type, inputs: inputs.clone(), distinct, - order_by_fields: order_by_fields.clone(), + order_by: order_by.clone(), filter: filter.clone(), })) .cast_implicit(return_type.clone()) @@ -711,7 +709,7 @@ impl LogicalAggBuilder { return_type: count_return_type, inputs, distinct, - order_by_fields, + order_by, filter, })); @@ -822,7 +820,7 @@ impl LogicalAggBuilder { return_type, inputs, distinct, - order_by_fields, + order_by, filter, }) .into()), @@ -1007,7 +1005,7 @@ impl LogicalAgg { agg_call.inputs.iter_mut().for_each(|i| { *i = InputRef::new(input_col_change.map(i.index()), i.return_type()) }); - agg_call.order_by_fields.iter_mut().for_each(|field| { + agg_call.order_by.iter_mut().for_each(|field| { let i = &mut field.input; *i = InputRef::new(input_col_change.map(i.index()), i.return_type()) }); @@ -1099,7 +1097,7 @@ impl ColPrunable for LogicalAgg { let index = index - self.group_key().len(); let agg_call = self.agg_calls()[index].clone(); tmp.extend(agg_call.inputs.iter().map(|x| x.index())); - tmp.extend(agg_call.order_by_fields.iter().map(|x| x.input.index())); + tmp.extend(agg_call.order_by.iter().map(|x| x.input.index())); // collect columns used in aggregate filter expressions for i in &agg_call.filter.conjunctions { tmp.union_with(&i.collect_input_refs(input_cnt)); @@ -1468,7 +1466,7 @@ mod tests { return_type: ty.clone(), inputs: vec![InputRef::new(2, ty.clone())], distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: Condition::true_cond(), }; LogicalAgg::new(vec![agg_call], vec![1], values.into()) @@ -1587,7 +1585,7 @@ mod tests { return_type: ty.clone(), inputs: vec![InputRef::new(2, ty.clone())], distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: Condition::true_cond(), }; let agg: PlanRef = LogicalAgg::new(vec![agg_call], vec![1], values.into()).into(); @@ -1652,7 +1650,7 @@ mod tests { return_type: ty.clone(), inputs: vec![InputRef::new(2, ty.clone())], distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: Condition::true_cond(), }, PlanAggCall { @@ -1660,7 +1658,7 @@ mod tests { return_type: ty.clone(), inputs: vec![InputRef::new(1, ty.clone())], distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: Condition::true_cond(), }, ]; diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index a764c3d7acf5a..b3b9ec8cb4c4d 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -14,6 +14,7 @@ use derivative::Derivative; use generic::PlanAggCall; +use itertools::Itertools; use pb::stream_node as pb_node; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::types::DataType; @@ -225,23 +226,16 @@ impl HashJoin { pub fn infer_internal_and_degree_table_catalog( input: &impl StreamPlanRef, join_key_indices: Vec, + dk_indices_in_jk: Vec, ) -> (TableCatalog, TableCatalog, Vec) { let schema = input.schema(); - let internal_table_dist_keys = input.distribution().dist_column_indices().to_vec(); - - // Find the dist key position in join key. - // FIXME(yuhao): currently the dist key position is not the exact position mapped to the - // join key when there are duplicate value in join key indices. - let degree_table_dist_keys = internal_table_dist_keys + let internal_table_dist_keys = dk_indices_in_jk .iter() - .map(|idx| { - join_key_indices - .iter() - .position(|v| v == idx) - .expect("join key should contain dist key.") - }) - .collect(); + .map(|idx| join_key_indices[*idx]) + .collect_vec(); + + let degree_table_dist_keys = dk_indices_in_jk.clone(); // The pk of hash join internal and degree table should be join_key + input_pk. let join_key_len = join_key_indices.len(); @@ -287,6 +281,10 @@ impl HashJoin { internal_table_catalog_builder.set_read_prefix_len_hint(join_key_len); degree_table_catalog_builder.set_read_prefix_len_hint(join_key_len); + + internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone()); + degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk); + ( internal_table_catalog_builder.build(internal_table_dist_keys), degree_table_catalog_builder.build(degree_table_dist_keys), @@ -501,7 +499,7 @@ pub fn to_stream_prost_body( left_table_id: left_table_desc.table_id.table_id(), right_table_id: right_table_desc.table_id.table_id(), left_info: Some(ArrangementInfo { - arrange_key_orders: left_table_desc.arrange_key_orders_prost(), + arrange_key_orders: left_table_desc.arrange_key_orders_protobuf(), column_descs: left_table .core .column_descs() @@ -511,7 +509,7 @@ pub fn to_stream_prost_body( table_desc: Some(left_table_desc.to_protobuf()), }), right_info: Some(ArrangementInfo { - arrange_key_orders: right_table_desc.arrange_key_orders_prost(), + arrange_key_orders: right_table_desc.arrange_key_orders_protobuf(), column_descs: right_table .core .column_descs() @@ -625,63 +623,8 @@ pub fn to_stream_prost_body( .collect(), }) } - Node::HashJoin(me) => { - let left_key_indices = me.eq_join_predicate.left_eq_indexes(); - let right_key_indices = me.eq_join_predicate.right_eq_indexes(); - let left_key_indices_prost = left_key_indices.iter().map(|&idx| idx as i32).collect(); - let right_key_indices_prost = right_key_indices.iter().map(|&idx| idx as i32).collect(); - - let (left_table, left_degree_table, left_deduped_input_pk_indices) = - HashJoin::infer_internal_and_degree_table_catalog( - &me.core.left.0, - left_key_indices, - ); - let (right_table, right_degree_table, right_deduped_input_pk_indices) = - HashJoin::infer_internal_and_degree_table_catalog( - &me.core.right.0, - right_key_indices, - ); - - let left_deduped_input_pk_indices = left_deduped_input_pk_indices - .iter() - .map(|idx| *idx as u32) - .collect(); - - let right_deduped_input_pk_indices = right_deduped_input_pk_indices - .iter() - .map(|idx| *idx as u32) - .collect(); - - let (left_table, left_degree_table) = ( - left_table.with_id(state.gen_table_id_wrapped()), - left_degree_table.with_id(state.gen_table_id_wrapped()), - ); - let (right_table, right_degree_table) = ( - right_table.with_id(state.gen_table_id_wrapped()), - right_degree_table.with_id(state.gen_table_id_wrapped()), - ); - - let null_safe_prost = me.eq_join_predicate.null_safes().into_iter().collect(); - - ProstNode::HashJoin(HashJoinNode { - join_type: me.core.join_type as i32, - left_key: left_key_indices_prost, - right_key: right_key_indices_prost, - null_safe: null_safe_prost, - condition: me - .eq_join_predicate - .other_cond() - .as_expr_unless_true() - .map(|x| x.to_expr_proto()), - left_table: Some(left_table.to_internal_table_prost()), - right_table: Some(right_table.to_internal_table_prost()), - left_degree_table: Some(left_degree_table.to_internal_table_prost()), - right_degree_table: Some(right_degree_table.to_internal_table_prost()), - left_deduped_input_pk_indices, - right_deduped_input_pk_indices, - output_indices: me.core.output_indices.iter().map(|&x| x as u32).collect(), - is_append_only: me.is_append_only, - }) + Node::HashJoin(_) => { + unreachable!(); } Node::HopWindow(me) => { let window_start_exprs = me diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index e4171d5faa5a9..698ae0c3346d0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -198,7 +198,7 @@ impl StreamNode for StreamDeltaJoin { right_table_id: right_table_desc.table_id.table_id(), left_info: Some(ArrangementInfo { // TODO: remove it - arrange_key_orders: left_table_desc.arrange_key_orders_prost(), + arrange_key_orders: left_table_desc.arrange_key_orders_protobuf(), // TODO: remove it column_descs: left_table .column_descs() @@ -209,7 +209,7 @@ impl StreamNode for StreamDeltaJoin { }), right_info: Some(ArrangementInfo { // TODO: remove it - arrange_key_orders: right_table_desc.arrange_key_orders_prost(), + arrange_key_orders: right_table_desc.arrange_key_orders_protobuf(), // TODO: remove it column_descs: right_table .column_descs() diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 2f78c979be171..95395325fd19b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -17,6 +17,7 @@ use std::fmt; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::catalog::{FieldDisplay, Schema}; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::HashJoinNode; @@ -150,6 +151,29 @@ impl StreamHashJoin { pub fn to_delta_join(&self) -> StreamDeltaJoin { StreamDeltaJoin::new(self.logical.clone(), self.eq_join_predicate.clone()) } + + pub fn derive_dist_key_in_join_key(&self) -> Vec { + let left_dk_indices = self.left().distribution().dist_column_indices().to_vec(); + let right_dk_indices = self.right().distribution().dist_column_indices().to_vec(); + let left_jk_indices = self.eq_join_predicate.left_eq_indexes(); + let right_jk_indices = self.eq_join_predicate.right_eq_indexes(); + + assert_eq!(left_jk_indices.len(), right_jk_indices.len()); + + let mut dk_indices_in_jk = vec![]; + + for (l_dk_idx, r_dk_idx) in left_dk_indices.iter().zip_eq_fast(right_dk_indices.iter()) { + for dk_idx_in_jk in left_jk_indices.iter().positions(|idx| idx == l_dk_idx) { + if right_jk_indices[dk_idx_in_jk] == *r_dk_idx { + dk_indices_in_jk.push(dk_idx_in_jk); + break; + } + } + } + + assert_eq!(dk_indices_in_jk.len(), left_dk_indices.len()); + dk_indices_in_jk + } } impl fmt::Display for StreamHashJoin { @@ -231,24 +255,25 @@ impl_plan_tree_node_for_binary! { StreamHashJoin } impl StreamNode for StreamHashJoin { fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody { - let left_key_indices = self.eq_join_predicate.left_eq_indexes(); - let right_key_indices = self.eq_join_predicate.right_eq_indexes(); - let left_key_indices_prost = left_key_indices.iter().map(|idx| *idx as i32).collect_vec(); - let right_key_indices_prost = right_key_indices - .iter() - .map(|idx| *idx as i32) - .collect_vec(); + let left_jk_indices = self.eq_join_predicate.left_eq_indexes(); + let right_jk_indices = self.eq_join_predicate.right_eq_indexes(); + let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec(); + let right_jk_indices_prost = right_jk_indices.iter().map(|idx| *idx as i32).collect_vec(); + + let dk_indices_in_jk = self.derive_dist_key_in_join_key(); use super::stream::HashJoin; let (left_table, left_degree_table, left_deduped_input_pk_indices) = HashJoin::infer_internal_and_degree_table_catalog( self.left().plan_base(), - left_key_indices, + left_jk_indices, + dk_indices_in_jk.clone(), ); let (right_table, right_degree_table, right_deduped_input_pk_indices) = HashJoin::infer_internal_and_degree_table_catalog( self.right().plan_base(), - right_key_indices, + right_jk_indices, + dk_indices_in_jk, ); let left_deduped_input_pk_indices = left_deduped_input_pk_indices @@ -274,8 +299,8 @@ impl StreamNode for StreamHashJoin { NodeBody::HashJoin(HashJoinNode { join_type: self.logical.join_type() as i32, - left_key: left_key_indices_prost, - right_key: right_key_indices_prost, + left_key: left_jk_indices_prost, + right_key: right_jk_indices_prost, null_safe: null_safe_prost, condition: self .eq_join_predicate diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 58d1dcfce7add..22e281becebed 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -198,6 +198,7 @@ impl StreamMaterialize { read_prefix_len_hint, version, watermark_columns, + dist_key_in_pk: vec![], }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 80150420bac4f..4a14bfb57f1b4 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -36,6 +36,7 @@ pub struct TableCatalogBuilder { column_names: HashMap, read_prefix_len_hint: usize, watermark_columns: Option, + dist_key_in_pk: Option>, } /// For DRY, mainly used for construct internal table catalog in stateful streaming executors. @@ -98,6 +99,10 @@ impl TableCatalogBuilder { self.watermark_columns = Some(watermark_columns); } + pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec) { + self.dist_key_in_pk = Some(dist_key_in_pk); + } + /// Check the column name whether exist before. if true, record occurrence and change the name /// to avoid duplicate. fn avoid_duplicate_col_name(&mut self, column_desc: &mut ColumnDesc) { @@ -149,6 +154,7 @@ impl TableCatalogBuilder { read_prefix_len_hint: self.read_prefix_len_hint, version: None, // the internal table is not versioned and can't be schema changed watermark_columns, + dist_key_in_pk: self.dist_key_in_pk.unwrap_or(vec![]), } } diff --git a/src/frontend/src/optimizer/property/order.rs b/src/frontend/src/optimizer/property/order.rs index f5a09c18a5c20..1717cbce0759a 100644 --- a/src/frontend/src/optimizer/property/order.rs +++ b/src/frontend/src/optimizer/property/order.rs @@ -19,7 +19,7 @@ use parse_display::Display; use risingwave_common::catalog::{FieldDisplay, Schema}; use risingwave_common::error::Result; use risingwave_common::util::sort_util::{OrderPair, OrderType}; -use risingwave_pb::plan_common::{ColumnOrder, OrderType as ProstOrderType}; +use risingwave_pb::common::{PbColumnOrder, PbDirection, PbOrderType}; use super::super::plan_node::*; use crate::optimizer::PlanRef; @@ -34,7 +34,7 @@ impl Order { Self { field_order } } - pub fn to_protobuf(&self) -> Vec { + pub fn to_protobuf(&self) -> Vec { self.field_order .iter() .map(FieldOrder::to_protobuf) @@ -144,22 +144,23 @@ impl FieldOrder { } } - pub fn to_protobuf(&self) -> ColumnOrder { - ColumnOrder { - order_type: self.direct.to_protobuf() as i32, - index: self.index as u32, + pub fn to_protobuf(&self) -> PbColumnOrder { + PbColumnOrder { + column_index: self.index as _, + order_type: Some(PbOrderType { + direction: self.direct.to_protobuf() as _, + }), } } - pub fn from_protobuf(column_order: &ColumnOrder) -> Self { - let order_type: ProstOrderType = ProstOrderType::from_i32(column_order.order_type).unwrap(); + pub fn from_protobuf(column_order: &PbColumnOrder) -> Self { Self { - direct: Direction::from_protobuf(&order_type), - index: column_order.index as usize, + index: column_order.column_index as _, + direct: Direction::from_protobuf(&column_order.get_order_type().unwrap().direction()), } } - // TODO: unify them + // TODO(rc): unify them pub fn to_order_pair(&self) -> OrderPair { OrderPair { column_idx: self.index, @@ -193,23 +194,23 @@ impl From for OrderType { } impl Direction { - pub fn to_protobuf(self) -> ProstOrderType { + pub fn to_protobuf(self) -> PbDirection { match self { - Self::Asc => ProstOrderType::Ascending, - Self::Desc => ProstOrderType::Descending, + Self::Asc => PbDirection::Ascending, + Self::Desc => PbDirection::Descending, _ => unimplemented!(), } } - pub fn from_protobuf(order_type: &ProstOrderType) -> Self { + pub fn from_protobuf(order_type: &PbDirection) -> Self { match order_type { - ProstOrderType::Ascending => Self::Asc, - ProstOrderType::Descending => Self::Desc, - ProstOrderType::OrderUnspecified => unreachable!(), + PbDirection::Ascending => Self::Asc, + PbDirection::Descending => Self::Desc, + PbDirection::Unspecified => unreachable!(), } } - // TODO: unify them + // TODO(rc): unify them pub fn to_order(self) -> OrderType { match self { Self::Asc => OrderType::Ascending, diff --git a/src/frontend/src/optimizer/rule/apply_agg_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_agg_transpose_rule.rs index a695b0a7e15db..1c25ba4c7b8c5 100644 --- a/src/frontend/src/optimizer/rule/apply_agg_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_agg_transpose_rule.rs @@ -128,7 +128,7 @@ impl Rule for ApplyAggTransposeRule { input_ref.shift_with_offset(offset); }); agg_call - .order_by_fields + .order_by .iter_mut() .for_each(|o| o.input.shift_with_offset(offset)); agg_call.filter = agg_call.filter.clone().rewrite_expr(&mut shift_index); diff --git a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs index 02819856c0a72..375d481ef1246 100644 --- a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs +++ b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs @@ -50,7 +50,7 @@ impl Rule for MinMaxOnIndexRule { if matches!(first_call.agg_kind, AggKind::Min | AggKind::Max) && !first_call.distinct && first_call.filter.always_true() - && first_call.order_by_fields.is_empty() + && first_call.order_by.is_empty() { let logical_scan: LogicalScan = logical_agg.input().as_logical_scan()?.to_owned(); let kind = calls.first()?.agg_kind; @@ -161,7 +161,7 @@ impl MinMaxOnIndexRule { 0, logical_agg.schema().fields[0].data_type.clone(), )], - order_by_fields: vec![], + order_by: vec![], distinct: false, filter: Condition { conjunctions: vec![], @@ -220,7 +220,7 @@ impl MinMaxOnIndexRule { 0, logical_agg.schema().fields[0].data_type.clone(), )], - order_by_fields: vec![], + order_by: vec![], distinct: false, filter: Condition { conjunctions: vec![], diff --git a/src/java_binding/run_demo.sh b/src/java_binding/run_demo.sh index 27c82fe2b19c5..72bcc9642b3c0 100644 --- a/src/java_binding/run_demo.sh +++ b/src/java_binding/run_demo.sh @@ -15,18 +15,18 @@ INSERT INTO ${TABLE_NAME} values ${INSERT_DATA}; FLUSH; EOF -#set -x -# -#cd ${RISINGWAVE_ROOT}/java -# -#mvn exec:exec \ -# -pl java-binding-integration-test \ -# -Dexec.executable=java \ -# -Dexec.args=" \ -# -cp %classpath:java-binding/target*.jar:proto/target/*.jar \ -# -Djava.library.path=${RISINGWAVE_ROOT}/target/debug \ -# com.risingwave.java.binding.Demo" -# -#psql -d dev -h localhost -p 4566 -U root << EOF -#DROP TABLE ${TABLE_NAME}; -#EOF \ No newline at end of file +set -x + +cd ${RISINGWAVE_ROOT}/java + +mvn exec:exec \ + -pl java-binding-integration-test \ + -Dexec.executable=java \ + -Dexec.args=" \ + -cp %classpath:java-binding/target*.jar:proto/target/*.jar \ + -Djava.library.path=${RISINGWAVE_ROOT}/target/debug \ + com.risingwave.java.binding.Demo" + +psql -d dev -h localhost -p 4566 -U root << EOF +DROP TABLE ${TABLE_NAME}; +EOF \ No newline at end of file diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 1bc5aec7f0085..b57c106139509 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -19,14 +19,14 @@ use std::vec; use itertools::Itertools; use risingwave_common::catalog::{DatabaseId, SchemaId, TableId}; use risingwave_pb::catalog::Table as ProstTable; -use risingwave_pb::common::{ParallelUnit, WorkerNode}; +use risingwave_pb::common::{ParallelUnit, PbColumnOrder, PbDirection, PbOrderType, WorkerNode}; use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::DataType; use risingwave_pb::expr::agg_call::Type; use risingwave_pb::expr::expr_node::RexNode; use risingwave_pb::expr::expr_node::Type::{Add, GreaterThan, InputRef}; use risingwave_pb::expr::{AggCall, ExprNode, FunctionCall, InputRef as ProstInputRef}; -use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc, ColumnOrder, Field, OrderType}; +use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc, Field}; use risingwave_pb::stream_plan::stream_fragment_graph::{StreamFragment, StreamFragmentEdge}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ @@ -68,7 +68,7 @@ fn make_sum_aggcall(idx: u32) -> AggCall { ..Default::default() }), distinct: false, - order_by_fields: vec![], + order_by: vec![], filter: None, } } @@ -91,10 +91,12 @@ fn make_field(type_name: TypeName) -> Field { } } -fn make_column_order(index: u32) -> ColumnOrder { - ColumnOrder { - order_type: OrderType::Ascending as i32, - index, +fn make_column_order(column_index: u32) -> PbColumnOrder { + PbColumnOrder { + column_index, + order_type: Some(PbOrderType { + direction: PbDirection::Ascending as _, + }), } } @@ -123,9 +125,11 @@ fn make_source_internal_table(id: u32) -> ProstTable { database_id: DatabaseId::placeholder().database_id, name: String::new(), columns, - pk: vec![ColumnOrder { - index: 0, - order_type: 2, + pk: vec![PbColumnOrder { + column_index: 0, + order_type: Some(PbOrderType { + direction: PbDirection::Descending as _, + }), }], ..Default::default() } @@ -142,9 +146,11 @@ fn make_internal_table(id: u32, is_agg_value: bool) -> ProstTable { database_id: DatabaseId::placeholder().database_id, name: String::new(), columns, - pk: vec![ColumnOrder { - index: 0, - order_type: 2, + pk: vec![PbColumnOrder { + column_index: 0, + order_type: Some(PbOrderType { + direction: PbDirection::Descending as _, + }), }], stream_key: vec![2], ..Default::default() diff --git a/src/prost/build.rs b/src/prost/build.rs index 78c81874fd30b..bdf1702f56712 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -59,7 +59,7 @@ fn main() -> Result<(), Box> { .type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]") .type_attribute("data.DataType", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode.rex_node", "#[derive(Eq, Hash)]") - .type_attribute("expr.InputRefExpr", "#[derive(Eq, Hash)]") + .type_attribute("expr.InputRef", "#[derive(Eq, Hash)]") .type_attribute("data.Datum", "#[derive(Eq, Hash)]") .type_attribute("expr.FunctionCall", "#[derive(Eq, Hash)]") .type_attribute("expr.UserDefinedFunction", "#[derive(Eq, Hash)]") diff --git a/src/prost/helpers/src/lib.rs b/src/prost/helpers/src/lib.rs index 99a95fdf88976..9ab473e40186c 100644 --- a/src/prost/helpers/src/lib.rs +++ b/src/prost/helpers/src/lib.rs @@ -17,7 +17,7 @@ use proc_macro::TokenStream; use proc_macro2::TokenStream as TokenStream2; use proc_macro_error::{proc_macro_error, ResultExt}; -use quote::quote; +use quote::{format_ident, quote}; use syn::{DataStruct, DeriveInput}; mod generate; @@ -44,7 +44,7 @@ fn produce(ast: &DeriveInput) -> TokenStream2 { let name = &ast.ident; // Is it a struct? - if let syn::Data::Struct(DataStruct { ref fields, .. }) = ast.data { + let struct_get = if let syn::Data::Struct(DataStruct { ref fields, .. }) = ast.data { let generated = fields.iter().map(generate::implement); quote! { impl #name { @@ -54,5 +54,20 @@ fn produce(ast: &DeriveInput) -> TokenStream2 { } else { // Do nothing. quote! {} + }; + + // Add a `Pb`-prefixed alias for all types. + let pb_alias = { + let pb_name = format_ident!("Pb{name}"); + let doc = format!("Alias for [`{name}`]."); + quote! { + #[doc = #doc] + pub type #pb_name = #name; + } + }; + + quote! { + #pb_alias + #struct_get } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 32e8118a8dc28..2b6654d0cc4df 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -1971,9 +1971,15 @@ impl Parser { // parse optional column list (schema) and watermarks on source. let (columns, constraints, source_watermarks) = self.parse_columns_with_watermark()?; - let append_only = if cfg!(debug_assertions) && self.parse_keyword(Keyword::APPEND) { + let append_only = if self.parse_keyword(Keyword::APPEND) { self.expect_keyword(Keyword::ONLY)?; - true + if cfg!(debug_assertions) { + true + } else { + return Err(ParserError::ParserError( + "APPEND ONLY is only allowed in debug model".to_string(), + )); + } } else { false }; diff --git a/src/storage/hummock_sdk/src/filter_key_extractor.rs b/src/storage/hummock_sdk/src/filter_key_extractor.rs index 3e89e2af010bb..1d9817646a58e 100644 --- a/src/storage/hummock_sdk/src/filter_key_extractor.rs +++ b/src/storage/hummock_sdk/src/filter_key_extractor.rs @@ -159,7 +159,7 @@ impl SchemaFilterKeyExtractor { let pk_indices: Vec = table_catalog .pk .iter() - .map(|col_order| col_order.index as usize) + .map(|col_order| col_order.column_index as usize) .collect(); let read_prefix_len = table_catalog.get_read_prefix_len_hint() as usize; @@ -174,9 +174,7 @@ impl SchemaFilterKeyExtractor { .pk .iter() .map(|col_order| { - OrderType::from_prost( - &risingwave_pb::plan_common::OrderType::from_i32(col_order.order_type).unwrap(), - ) + OrderType::from_protobuf(&col_order.get_order_type().unwrap().direction()) }) .collect(); @@ -352,7 +350,8 @@ mod tests { use risingwave_common::util::sort_util::OrderType; use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::Table as ProstTable; - use risingwave_pb::plan_common::{ColumnCatalog as ProstColumnCatalog, ColumnOrder}; + use risingwave_pb::common::{PbColumnOrder, PbDirection, PbOrderType}; + use risingwave_pb::plan_common::ColumnCatalog as ProstColumnCatalog; use tokio::task; use super::{DummyFilterKeyExtractor, FilterKeyExtractor, SchemaFilterKeyExtractor}; @@ -438,13 +437,17 @@ mod tests { }, ], pk: vec![ - ColumnOrder { - order_type: 1, // Ascending - index: 1, + PbColumnOrder { + column_index: 1, + order_type: Some(PbOrderType { + direction: PbDirection::Ascending as _, + }), }, - ColumnOrder { - order_type: 1, // Ascending - index: 3, + PbColumnOrder { + column_index: 3, + order_type: Some(PbOrderType { + direction: PbDirection::Ascending as _, + }), }, ], stream_key: vec![0], @@ -466,6 +469,7 @@ mod tests { read_prefix_len_hint: 1, version: None, watermark_indices: vec![], + dist_key_in_pk: vec![], } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 6bb15825e9849..2bbecf7cef072 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -159,9 +159,7 @@ where .pk .iter() .map(|col_order| { - OrderType::from_prost( - &risingwave_pb::plan_common::OrderType::from_i32(col_order.order_type).unwrap(), - ) + OrderType::from_protobuf(&col_order.get_order_type().unwrap().direction()) }) .collect(); let dist_key_indices: Vec = table_catalog @@ -173,10 +171,20 @@ where let pk_indices = table_catalog .pk .iter() - .map(|col_order| col_order.index as usize) + .map(|col_order| col_order.column_index as usize) .collect_vec(); - let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices); + // FIXME(yuhao): only use `dist_key_in_pk` in the proto + let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() { + get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices) + } else { + table_catalog + .get_dist_key_in_pk() + .iter() + .map(|idx| *idx as usize) + .collect() + }; + let table_option = TableOption::build_table_option(table_catalog.get_properties()); let local_state_store = store .new_local(NewLocalOptions { diff --git a/src/stream/src/common/table/test_utils.rs b/src/stream/src/common/table/test_utils.rs index 18ddf487646ff..426a254e639ed 100644 --- a/src/stream/src/common/table/test_utils.rs +++ b/src/stream/src/common/table/test_utils.rs @@ -17,7 +17,8 @@ use risingwave_common::catalog::{ColumnDesc, TableId}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::catalog::Table as ProstTable; -use risingwave_pb::plan_common::{ColumnCatalog, ColumnOrder}; +use risingwave_pb::common::{PbColumnOrder, PbOrderType}; +use risingwave_pb::plan_common::ColumnCatalog; pub(crate) fn gen_prost_table( table_id: TableId, @@ -48,9 +49,11 @@ pub(crate) fn gen_prost_table_with_value_indices( let prost_pk = pk_index .iter() .zip_eq_fast(order_types.iter()) - .map(|(idx, order)| ColumnOrder { - index: *idx as u32, - order_type: order.to_prost() as i32, + .map(|(idx, order)| PbColumnOrder { + column_index: *idx as _, + order_type: Some(PbOrderType { + direction: order.to_protobuf() as _, + }), }) .collect(); let prost_columns = column_descs diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index baf7872ae5911..5e489589d059a 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -25,7 +25,7 @@ use tokio::sync::mpsc::UnboundedReceiver; use super::{ Barrier, BoxedMessageStream, Executor, Message, Mutation, PkIndices, PkIndicesRef, - StreamExecutorError, + StreamExecutorError, Watermark, }; use crate::common::table::state_table::StateTable; @@ -122,12 +122,11 @@ impl NowExecutor { yield Message::Chunk(stream_chunk); - // TODO: depends on "https://github.com/risingwavelabs/risingwave/issues/6042" - // yield Message::Watermark(Watermark::new( - // 0, - // DataType::TIMESTAMPTZ, - // timestamp.as_ref().unwrap().clone(), - // )); + yield Message::Watermark(Watermark::new( + 0, + DataType::Timestamptz, + timestamp.as_ref().unwrap().clone(), + )); if last_timestamp.is_some() { state_table.delete(row::once(last_timestamp)); @@ -176,14 +175,14 @@ mod tests { use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::test_prelude::StreamChunkTestExt; - use risingwave_common::types::DataType; + use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::sort_util::OrderType; use risingwave_storage::memory::MemoryStateStore; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use super::NowExecutor; use crate::common::table::state_table::StateTable; - use crate::executor::{Barrier, BoxedMessageStream, Executor, PkIndices}; + use crate::executor::{Barrier, BoxedMessageStream, Executor, Message, PkIndices, Watermark}; #[tokio::test] async fn test_now() { @@ -211,16 +210,16 @@ mod tests { ); // Consume the watermark - // let watermark = now_executor.next().await.unwrap().unwrap(); - // - // assert_eq!( - // watermark, - // Message::Watermark(Watermark::new( - // 0, - // DataType::TIMESTAMPTZ, - // ScalarImpl::Int64(1617235200001000) - // )) - // ); + let watermark = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + watermark, + Message::Watermark(Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Int64(1617235200001000) + )) + ); // Consume the barrier now_executor.next().await.unwrap().unwrap(); @@ -241,16 +240,16 @@ mod tests { ); // Consume the watermark - // let watermark = now_executor.next().await.unwrap().unwrap(); - // - // assert_eq!( - // watermark, - // Message::Watermark(Watermark::new( - // 0, - // DataType::TIMESTAMPTZ, - // ScalarImpl::Int64(1617235200002000) - // )) - // ); + let watermark = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + watermark, + Message::Watermark(Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Int64(1617235200002000) + )) + ); // Consume the barrier now_executor.next().await.unwrap().unwrap(); diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 697ad038d75a1..0ceae9e36a6f0 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -28,9 +28,10 @@ use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; use risingwave_hummock_sdk::key::next_key; use risingwave_pb::catalog::table::TableType; use risingwave_pb::catalog::Table as ProstTable; +use risingwave_pb::common::{PbColumnOrder, PbDirection, PbOrderType}; use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::DataType; -use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc, ColumnOrder}; +use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc}; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; @@ -223,9 +224,11 @@ pub fn default_source_internal_table(id: u32) -> ProstTable { columns, table_type: TableType::Internal as i32, value_indices: vec![0, 1], - pk: vec![ColumnOrder { - index: 0, - order_type: 1, + pk: vec![PbColumnOrder { + column_index: 0, + order_type: Some(PbOrderType { + direction: PbDirection::Ascending as _, + }), }], ..Default::default() } diff --git a/src/stream/src/from_proto/agg_common.rs b/src/stream/src/from_proto/agg_common.rs index 75c28623c57db..bf409a699a410 100644 --- a/src/stream/src/from_proto/agg_common.rs +++ b/src/stream/src/from_proto/agg_common.rs @@ -21,7 +21,6 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::{OrderPair, OrderType}; use risingwave_expr::expr::{build_from_prost, AggKind}; -use risingwave_pb::plan_common::OrderType as ProstOrderType; use super::*; use crate::common::table::state_table::StateTable; @@ -48,12 +47,12 @@ pub fn build_agg_call_from_prost( _ => bail!("Too many/few arguments for {:?}", agg_kind), }; let order_pairs = agg_call_proto - .get_order_by_fields() + .get_order_by() .iter() - .map(|field| { - let col_idx = field.get_input() as usize; + .map(|col_order| { + let col_idx = col_order.get_column_index() as usize; let order_type = - OrderType::from_prost(&ProstOrderType::from_i32(field.direction).unwrap()); + OrderType::from_protobuf(&col_order.get_order_type().unwrap().direction()); // TODO(yuchao): `nulls first/last` is not supported yet, so it's ignore here, // see also `risingwave_common::util::sort_util::compare_values` OrderPair::new(col_idx, order_type) diff --git a/src/stream/src/from_proto/batch_query.rs b/src/stream/src/from_proto/batch_query.rs index 891a1e7cbbf97..96cf1d73ba2a6 100644 --- a/src/stream/src/from_proto/batch_query.rs +++ b/src/stream/src/from_proto/batch_query.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableOption}; use risingwave_common::util::sort_util::OrderType; -use risingwave_pb::plan_common::{OrderType as ProstOrderType, StorageTableDesc}; +use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan::BatchPlanNode; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::Distribution; @@ -44,7 +44,7 @@ impl ExecutorBuilder for BatchQueryExecutorBuilder { let order_types = table_desc .pk .iter() - .map(|desc| OrderType::from_prost(&ProstOrderType::from_i32(desc.order_type).unwrap())) + .map(|desc| OrderType::from_protobuf(&desc.get_order_type().unwrap().direction())) .collect_vec(); let column_descs = table_desc @@ -60,7 +60,11 @@ impl ExecutorBuilder for BatchQueryExecutorBuilder { .collect(); // Use indices based on full table instead of streaming executor output. - let pk_indices = table_desc.pk.iter().map(|k| k.index as usize).collect_vec(); + let pk_indices = table_desc + .pk + .iter() + .map(|k| k.column_index as usize) + .collect_vec(); let dist_key_indices = table_desc .dist_key_indices diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index a83c462b4591a..457f346b2846a 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -14,7 +14,7 @@ use risingwave_common::catalog::{ColumnDesc, TableId, TableOption}; use risingwave_common::util::sort_util::OrderType; -use risingwave_pb::plan_common::{OrderType as ProstOrderType, StorageTableDesc}; +use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan::{ChainNode, ChainType}; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::Distribution; @@ -91,7 +91,7 @@ impl ExecutorBuilder for ChainExecutorBuilder { .pk .iter() .map(|desc| { - OrderType::from_prost(&ProstOrderType::from_i32(desc.order_type).unwrap()) + OrderType::from_protobuf(&desc.get_order_type().unwrap().direction()) }) .collect_vec(); @@ -103,7 +103,11 @@ impl ExecutorBuilder for ChainExecutorBuilder { let column_ids = column_descs.iter().map(|x| x.column_id).collect_vec(); // Use indices based on full table instead of streaming executor output. - let pk_indices = table_desc.pk.iter().map(|k| k.index as usize).collect_vec(); + let pk_indices = table_desc + .pk + .iter() + .map(|k| k.column_index as usize) + .collect_vec(); let dist_key_indices = table_desc .dist_key_indices diff --git a/src/stream/src/from_proto/group_top_n.rs b/src/stream/src/from_proto/group_top_n.rs index 6d287f2a464e5..3284da6f32c99 100644 --- a/src/stream/src/from_proto/group_top_n.rs +++ b/src/stream/src/from_proto/group_top_n.rs @@ -44,13 +44,17 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder { let table = node.get_table()?; let vnodes = params.vnode_bitmap.map(Arc::new); let state_table = StateTable::from_table_catalog(table, store, vnodes).await; - let storage_key = table.get_pk().iter().map(OrderPair::from_prost).collect(); + let storage_key = table + .get_pk() + .iter() + .map(OrderPair::from_protobuf) + .collect(); let [input]: [_; 1] = params.input.try_into().unwrap(); let group_key_types = group_by .iter() .map(|i| input.schema()[*i].data_type()) .collect(); - let order_by = node.order_by.iter().map(OrderPair::from_prost).collect(); + let order_by = node.order_by.iter().map(OrderPair::from_protobuf).collect(); assert_eq!(¶ms.pk_indices, input.pk_indices()); let args = GroupTopNExecutorDispatcherArgs { diff --git a/src/stream/src/from_proto/group_top_n_appendonly.rs b/src/stream/src/from_proto/group_top_n_appendonly.rs index f7093bd1cdbd6..1d8d4137cd9a9 100644 --- a/src/stream/src/from_proto/group_top_n_appendonly.rs +++ b/src/stream/src/from_proto/group_top_n_appendonly.rs @@ -58,13 +58,17 @@ impl ExecutorBuilder for AppendOnlyGroupTopNExecutorBuilder { let table = node.get_table()?; let vnodes = params.vnode_bitmap.map(Arc::new); let state_table = StateTable::from_table_catalog(table, store, vnodes).await; - let storage_key = table.get_pk().iter().map(OrderPair::from_prost).collect(); + let storage_key = table + .get_pk() + .iter() + .map(OrderPair::from_protobuf) + .collect(); let [input]: [_; 1] = params.input.try_into().unwrap(); let group_key_types = group_by .iter() .map(|i| input.schema()[*i].data_type()) .collect(); - let order_by = node.order_by.iter().map(OrderPair::from_prost).collect(); + let order_by = node.order_by.iter().map(OrderPair::from_protobuf).collect(); assert_eq!(¶ms.pk_indices, input.pk_indices()); let args = AppendOnlyGroupTopNExecutorDispatcherArgs { diff --git a/src/stream/src/from_proto/lookup.rs b/src/stream/src/from_proto/lookup.rs index dfcad316584ca..6330c81bc6cf6 100644 --- a/src/stream/src/from_proto/lookup.rs +++ b/src/stream/src/from_proto/lookup.rs @@ -14,7 +14,7 @@ use risingwave_common::catalog::{ColumnDesc, TableId, TableOption}; use risingwave_common::util::sort_util::{OrderPair, OrderType}; -use risingwave_pb::plan_common::{OrderType as ProstOrderType, StorageTableDesc}; +use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan::LookupNode; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::Distribution; @@ -42,7 +42,7 @@ impl ExecutorBuilder for LookupExecutorBuilder { .get_arrangement_table_info()? .arrange_key_orders .iter() - .map(OrderPair::from_prost) + .map(OrderPair::from_protobuf) .collect(); let arrangement_col_descs = lookup @@ -65,7 +65,7 @@ impl ExecutorBuilder for LookupExecutorBuilder { let order_types = table_desc .pk .iter() - .map(|desc| OrderType::from_prost(&ProstOrderType::from_i32(desc.order_type).unwrap())) + .map(|desc| OrderType::from_protobuf(&desc.get_order_type().unwrap().direction())) .collect_vec(); let column_descs = table_desc @@ -76,7 +76,11 @@ impl ExecutorBuilder for LookupExecutorBuilder { let column_ids = column_descs.iter().map(|x| x.column_id).collect_vec(); // Use indices based on full table instead of streaming executor output. - let pk_indices = table_desc.pk.iter().map(|k| k.index as usize).collect_vec(); + let pk_indices = table_desc + .pk + .iter() + .map(|k| k.column_index as usize) + .collect_vec(); let dist_key_indices = table_desc .dist_key_indices diff --git a/src/stream/src/from_proto/mview.rs b/src/stream/src/from_proto/mview.rs index 306a80026a795..a4ef9e5c03756 100644 --- a/src/stream/src/from_proto/mview.rs +++ b/src/stream/src/from_proto/mview.rs @@ -38,7 +38,7 @@ impl ExecutorBuilder for MaterializeExecutorBuilder { let order_key = node .column_orders .iter() - .map(OrderPair::from_prost) + .map(OrderPair::from_protobuf) .collect(); let table = node.get_table()?; @@ -90,7 +90,7 @@ impl ExecutorBuilder for ArrangeExecutorBuilder { .get_table_info()? .arrange_key_orders .iter() - .map(OrderPair::from_prost) + .map(OrderPair::from_protobuf) .collect(); let table = node.get_table()?; diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 2a185ff888405..ee7dc0628ba60 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -39,7 +39,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { let pk_indices = sink_desc .pk .iter() - .map(|pk| pk.index as usize) + .map(|pk| pk.column_index as usize) .collect::>(); let schema = sink_desc.columns.iter().map(Into::into).collect(); // This field can be used to distinguish a specific actor in parallelism to prevent diff --git a/src/stream/src/from_proto/top_n.rs b/src/stream/src/from_proto/top_n.rs index 6b80f17c208b9..757fa2b7b6493 100644 --- a/src/stream/src/from_proto/top_n.rs +++ b/src/stream/src/from_proto/top_n.rs @@ -38,8 +38,12 @@ impl ExecutorBuilder for TopNExecutorNewBuilder { let table = node.get_table()?; let vnodes = params.vnode_bitmap.map(Arc::new); let state_table = StateTable::from_table_catalog(table, store, vnodes).await; - let storage_key = table.get_pk().iter().map(OrderPair::from_prost).collect(); - let order_by = node.order_by.iter().map(OrderPair::from_prost).collect(); + let storage_key = table + .get_pk() + .iter() + .map(OrderPair::from_protobuf) + .collect(); + let order_by = node.order_by.iter().map(OrderPair::from_protobuf).collect(); assert_eq!(¶ms.pk_indices, input.pk_indices()); if node.with_ties { diff --git a/src/stream/src/from_proto/top_n_appendonly.rs b/src/stream/src/from_proto/top_n_appendonly.rs index 3f23dc690a28f..f4d06b8b97601 100644 --- a/src/stream/src/from_proto/top_n_appendonly.rs +++ b/src/stream/src/from_proto/top_n_appendonly.rs @@ -38,8 +38,12 @@ impl ExecutorBuilder for AppendOnlyTopNExecutorBuilder { let table = node.get_table()?; let vnodes = params.vnode_bitmap.map(Arc::new); let state_table = StateTable::from_table_catalog(table, store, vnodes).await; - let storage_key = table.get_pk().iter().map(OrderPair::from_prost).collect(); - let order_by = node.order_by.iter().map(OrderPair::from_prost).collect(); + let storage_key = table + .get_pk() + .iter() + .map(OrderPair::from_protobuf) + .collect(); + let order_by = node.order_by.iter().map(OrderPair::from_protobuf).collect(); assert_eq!(¶ms.pk_indices, input.pk_indices()); if node.with_ties { diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 0cb224d94415c..7188fdb1f27f8 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -130,6 +130,7 @@ async fn compaction_test( row_id_index: None, version: None, watermark_indices: vec![], + dist_key_in_pk: vec![], }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2;