diff --git a/dashboard/proto/gen/connector_service.ts b/dashboard/proto/gen/connector_service.ts index 8e8a96c48bd0d..ae78e31e5ee3b 100644 --- a/dashboard/proto/gen/connector_service.ts +++ b/dashboard/proto/gen/connector_service.ts @@ -14,6 +14,7 @@ export const protobufPackage = "connector_service"; export const SinkPayloadFormat = { FORMAT_UNSPECIFIED: "FORMAT_UNSPECIFIED", JSON: "JSON", + STREAM_CHUNK: "STREAM_CHUNK", UNRECOGNIZED: "UNRECOGNIZED", } as const; @@ -27,6 +28,9 @@ export function sinkPayloadFormatFromJSON(object: any): SinkPayloadFormat { case 1: case "JSON": return SinkPayloadFormat.JSON; + case 2: + case "STREAM_CHUNK": + return SinkPayloadFormat.STREAM_CHUNK; case -1: case "UNRECOGNIZED": default: @@ -40,6 +44,8 @@ export function sinkPayloadFormatToJSON(object: SinkPayloadFormat): string { return "FORMAT_UNSPECIFIED"; case SinkPayloadFormat.JSON: return "JSON"; + case SinkPayloadFormat.STREAM_CHUNK: + return "STREAM_CHUNK"; case SinkPayloadFormat.UNRECOGNIZED: default: return "UNRECOGNIZED"; @@ -126,7 +132,10 @@ export interface SinkStreamRequest_StartSink { } export interface SinkStreamRequest_WriteBatch { - payload?: { $case: "jsonPayload"; jsonPayload: SinkStreamRequest_WriteBatch_JsonPayload }; + payload?: { $case: "jsonPayload"; jsonPayload: SinkStreamRequest_WriteBatch_JsonPayload } | { + $case: "streamChunkPayload"; + streamChunkPayload: SinkStreamRequest_WriteBatch_StreamChunkPayload; + }; batchId: number; epoch: number; } @@ -140,6 +149,10 @@ export interface SinkStreamRequest_WriteBatch_JsonPayload_RowOp { line: string; } +export interface SinkStreamRequest_WriteBatch_StreamChunkPayload { + binaryData: Uint8Array; +} + export interface SinkStreamRequest_StartEpoch { epoch: number; } @@ -482,6 +495,11 @@ export const SinkStreamRequest_WriteBatch = { return { payload: isSet(object.jsonPayload) ? { $case: "jsonPayload", jsonPayload: SinkStreamRequest_WriteBatch_JsonPayload.fromJSON(object.jsonPayload) } + : isSet(object.streamChunkPayload) + ? { + $case: "streamChunkPayload", + streamChunkPayload: SinkStreamRequest_WriteBatch_StreamChunkPayload.fromJSON(object.streamChunkPayload), + } : undefined, batchId: isSet(object.batchId) ? Number(object.batchId) : 0, epoch: isSet(object.epoch) ? Number(object.epoch) : 0, @@ -493,6 +511,9 @@ export const SinkStreamRequest_WriteBatch = { message.payload?.$case === "jsonPayload" && (obj.jsonPayload = message.payload?.jsonPayload ? SinkStreamRequest_WriteBatch_JsonPayload.toJSON(message.payload?.jsonPayload) : undefined); + message.payload?.$case === "streamChunkPayload" && (obj.streamChunkPayload = message.payload?.streamChunkPayload + ? SinkStreamRequest_WriteBatch_StreamChunkPayload.toJSON(message.payload?.streamChunkPayload) + : undefined); message.batchId !== undefined && (obj.batchId = Math.round(message.batchId)); message.epoch !== undefined && (obj.epoch = Math.round(message.epoch)); return obj; @@ -510,6 +531,18 @@ export const SinkStreamRequest_WriteBatch = { jsonPayload: SinkStreamRequest_WriteBatch_JsonPayload.fromPartial(object.payload.jsonPayload), }; } + if ( + object.payload?.$case === "streamChunkPayload" && + object.payload?.streamChunkPayload !== undefined && + object.payload?.streamChunkPayload !== null + ) { + message.payload = { + $case: "streamChunkPayload", + streamChunkPayload: SinkStreamRequest_WriteBatch_StreamChunkPayload.fromPartial( + object.payload.streamChunkPayload, + ), + }; + } message.batchId = object.batchId ?? 0; message.epoch = object.epoch ?? 0; return message; @@ -577,6 +610,31 @@ export const SinkStreamRequest_WriteBatch_JsonPayload_RowOp = { }, }; +function createBaseSinkStreamRequest_WriteBatch_StreamChunkPayload(): SinkStreamRequest_WriteBatch_StreamChunkPayload { + return { binaryData: new Uint8Array() }; +} + +export const SinkStreamRequest_WriteBatch_StreamChunkPayload = { + fromJSON(object: any): SinkStreamRequest_WriteBatch_StreamChunkPayload { + return { binaryData: isSet(object.binaryData) ? bytesFromBase64(object.binaryData) : new Uint8Array() }; + }, + + toJSON(message: SinkStreamRequest_WriteBatch_StreamChunkPayload): unknown { + const obj: any = {}; + message.binaryData !== undefined && + (obj.binaryData = base64FromBytes(message.binaryData !== undefined ? message.binaryData : new Uint8Array())); + return obj; + }, + + fromPartial, I>>( + object: I, + ): SinkStreamRequest_WriteBatch_StreamChunkPayload { + const message = createBaseSinkStreamRequest_WriteBatch_StreamChunkPayload(); + message.binaryData = object.binaryData ?? new Uint8Array(); + return message; + }, +}; + function createBaseSinkStreamRequest_StartEpoch(): SinkStreamRequest_StartEpoch { return { epoch: 0 }; } @@ -1094,6 +1152,50 @@ export const GetEventStreamResponse = { }, }; +declare var self: any | undefined; +declare var window: any | undefined; +declare var global: any | undefined; +var globalThis: any = (() => { + if (typeof globalThis !== "undefined") { + return globalThis; + } + if (typeof self !== "undefined") { + return self; + } + if (typeof window !== "undefined") { + return window; + } + if (typeof global !== "undefined") { + return global; + } + throw "Unable to locate global object"; +})(); + +function bytesFromBase64(b64: string): Uint8Array { + if (globalThis.Buffer) { + return Uint8Array.from(globalThis.Buffer.from(b64, "base64")); + } else { + const bin = globalThis.atob(b64); + const arr = new Uint8Array(bin.length); + for (let i = 0; i < bin.length; ++i) { + arr[i] = bin.charCodeAt(i); + } + return arr; + } +} + +function base64FromBytes(arr: Uint8Array): string { + if (globalThis.Buffer) { + return globalThis.Buffer.from(arr).toString("base64"); + } else { + const bin: string[] = []; + arr.forEach((byte) => { + bin.push(String.fromCharCode(byte)); + }); + return globalThis.btoa(bin.join("")); + } +} + type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; export type DeepPartial = T extends Builtin ? T