Skip to content

Commit

Permalink
feat(connector-node): support stream chunk payload in connector node (#…
Browse files Browse the repository at this point in the history
…8548)

Co-authored-by: William Wen <[email protected]>
  • Loading branch information
yufansong and wenym1 authored Mar 17, 2023
1 parent 8c95702 commit 99501d1
Show file tree
Hide file tree
Showing 17 changed files with 428 additions and 21 deletions.
5 changes: 5 additions & 0 deletions ci/scripts/e2e-iceberg-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ echo "--- Download artifacts"
mkdir -p target/debug
buildkite-agent artifact download risingwave-"$profile" target/debug/
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
mv target/debug/risingwave-"$profile" target/debug/risingwave
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so

export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
Expand Down
5 changes: 5 additions & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ echo "--- Download artifacts"
mkdir -p target/debug
buildkite-agent artifact download risingwave-"$profile" target/debug/
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
mv target/debug/risingwave-"$profile" target/debug/risingwave
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so

export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
Expand Down
4 changes: 4 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ echo "--- Download artifacts"
mkdir -p target/debug
buildkite-agent artifact download risingwave-"$profile" target/debug/
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
buildkite-agent artifact download librisingwave_java_binding.so-"$profile" target/debug
mv target/debug/risingwave-"$profile" target/debug/risingwave
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so

export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug


echo "--- Download connector node package"
Expand Down
104 changes: 103 additions & 1 deletion dashboard/proto/gen/connector_service.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ RUN rustup self update \

RUN cargo fetch

RUN cargo build -p risingwave_cmd_all --release --features "static-link static-log-level" && \
RUN cargo build -p risingwave_cmd_all -p risingwave_java_binding --release --features "static-link static-log-level" && \
mkdir -p /risingwave/bin && mv /risingwave/target/release/risingwave /risingwave/bin/ && \
mkdir -p /risingwave/lib && mv /risingwave/target/release/librisingwave_java_binding.so /risingwave/lib && \
cargo clean

RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true && \
Expand All @@ -47,10 +48,13 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certi

FROM image-base as risingwave
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
RUN mkdir -p /risingwave/bin/connector-node
RUN mkdir -p /risingwave/bin/connector-node && mkdir -p /risingwave/lib
COPY --from=builder /risingwave/bin/risingwave /risingwave/bin/risingwave
COPY --from=builder /risingwave/bin/connector-node /risingwave/bin/connector-node
COPY --from=builder /risingwave/ui /risingwave/ui
COPY --from=builder /risingwave/lib/librisingwave_java_binding.so /risingwave/lib/librisingwave_java_binding.so
# Set java.library.path env to /risingwave/lib
ENV RW_JAVA_BINDING_LIB_PATH /risingwave/lib
# Set default playground mode to docker-playground profile
ENV PLAYGROUND_PROFILE docker-playground
# Set default dashboard UI to local path instead of github proxy
Expand Down
2 changes: 1 addition & 1 deletion java/connector-node/assembly/scripts/start-service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ if [ -z "${port}" ]; then
port=$PORT
fi

java -classpath "${DIR}/libs/*" $MAIN --port ${port}
java -classpath "${DIR}/libs/*" -Djava.library.path="${RW_JAVA_BINDING_LIB_PATH}" $MAIN --port ${port}
4 changes: 4 additions & 0 deletions java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<groupId>com.risingwave.java</groupId>
<artifactId>proto</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>java-binding</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>connector-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.*;
import com.risingwave.connector.deserializer.StreamChunkDeserializer;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.metrics.MonitoredRowIterator;
import com.risingwave.proto.ConnectorServiceProto;
Expand Down Expand Up @@ -202,6 +203,9 @@ private void bindSink(SinkConfig sinkConfig, ConnectorServiceProto.SinkPayloadFo
case JSON:
deserializer = new JsonDeserializer(tableSchema);
break;
case STREAM_CHUNK:
deserializer = new StreamChunkDeserializer(tableSchema);
break;
}
ConnectorNodeMetrics.incActiveConnections(sinkConfig.getConnectorType(), "node1");
}
Expand Down
Loading

0 comments on commit 99501d1

Please sign in to comment.