Skip to content

Commit

Permalink
refactor test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Jun 5, 2023
1 parent d9f9d49 commit 833a753
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package com.risingwave.connector.api;

import com.google.common.collect.Lists;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data;
Expand Down Expand Up @@ -75,37 +74,6 @@ public List<ColumnDesc> getColumnDescs() {
return columnDescs;
}

public static TableSchema getMockTableSchema() {
return new TableSchema(
Lists.newArrayList("id", "name"),
Lists.newArrayList(
Data.DataType.newBuilder().setTypeName(TypeName.INT32).build(),
Data.DataType.newBuilder().setTypeName(TypeName.VARCHAR).build()),
Lists.newArrayList("id"));
}

public static ConnectorServiceProto.TableSchema getMockTableProto() {
return ConnectorServiceProto.TableSchema.newBuilder()
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("id")
.setDataType(
Data.DataType.newBuilder()
.setTypeName(TypeName.INT32)
.build())
.build())
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("name")
.setDataType(
Data.DataType.newBuilder()
.setTypeName(TypeName.VARCHAR)
.build())
.build())
.addAllPkIndices(List.of(1))
.build();
}

public Object getFromRow(String columnName, SinkRow row) {
return row.get(columnIndices.get(columnName));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector;

import com.google.common.collect.Lists;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data;
import java.util.List;

public class TestUtils {

public static TableSchema getMockTableSchema() {
return new TableSchema(
Lists.newArrayList("id", "name"),
Lists.newArrayList(
Data.DataType.newBuilder()
.setTypeName(Data.DataType.TypeName.INT32)
.build(),
Data.DataType.newBuilder()
.setTypeName(Data.DataType.TypeName.VARCHAR)
.build()),
Lists.newArrayList("id"));
}

public static ConnectorServiceProto.TableSchema getMockTableProto() {
return ConnectorServiceProto.TableSchema.newBuilder()
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("id")
.setDataType(
Data.DataType.newBuilder()
.setTypeName(Data.DataType.TypeName.INT32)
.build())
.build())
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("name")
.setDataType(
Data.DataType.newBuilder()
.setTypeName(Data.DataType.TypeName.VARCHAR)
.build())
.build())
.addAllPkIndices(List.of(1))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector;
package com.risingwave.connector.sink;

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.JsonDeserializer;
import com.risingwave.connector.TestUtils;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload;
Expand All @@ -23,7 +24,7 @@

public class DeserializerTest extends TestCase {
public void testJsonDeserializer() {
JsonDeserializer deserializer = new JsonDeserializer(TableSchema.getMockTableSchema());
JsonDeserializer deserializer = new JsonDeserializer(TestUtils.getMockTableSchema());
JsonPayload jsonPayload =
JsonPayload.newBuilder()
.addRowOps(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector;
package com.risingwave.connector.sink;

import static com.risingwave.proto.Data.*;
import static org.junit.Assert.*;

import com.google.common.collect.Iterators;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.FileSink;
import com.risingwave.connector.FileSinkConfig;
import com.risingwave.connector.TestUtils;
import com.risingwave.connector.api.sink.ArraySinkRow;
import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -36,7 +38,7 @@ public void testSync() throws IOException {
}

FileSinkConfig config = new FileSinkConfig(path);
FileSink sink = new FileSink(config, TableSchema.getMockTableSchema());
FileSink sink = new FileSink(config, TestUtils.getMockTableSchema());
String filePath = sink.getSinkPath();

Path file = Paths.get(filePath);
Expand Down Expand Up @@ -78,7 +80,7 @@ public void testWrite() throws IOException {
Files.createDirectories(Paths.get(path));
}
FileSinkConfig config = new FileSinkConfig(path);
FileSink sink = new FileSink(config, TableSchema.getMockTableSchema());
FileSink sink = new FileSink(config, TestUtils.getMockTableSchema());

String filePath = sink.getSinkPath();
try {
Expand Down Expand Up @@ -107,7 +109,7 @@ public void testDrop() throws IOException {
Files.createDirectories(Paths.get(path));
}
FileSinkConfig config = new FileSinkConfig(path);
FileSink sink = new FileSink(config, TableSchema.getMockTableSchema());
FileSink sink = new FileSink(config, TestUtils.getMockTableSchema());

sink.drop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector;
package com.risingwave.connector.sink;

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.SinkStreamObserver;
import com.risingwave.connector.TestUtils;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.SinkConfig;
import com.risingwave.proto.Data.Op;
Expand All @@ -27,7 +28,7 @@ public class SinkStreamObserverTest {

public SinkConfig fileSinkConfig =
SinkConfig.newBuilder()
.setTableSchema(TableSchema.getMockTableProto())
.setTableSchema(TestUtils.getMockTableProto())
.setConnectorType("file")
.putAllProperties(Map.of("output.path", "/tmp/rw-connector"))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.collect.Iterators;
import com.risingwave.connector.DeltaLakeSink;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.TestUtils;
import com.risingwave.connector.api.sink.ArraySinkRow;
import io.delta.standalone.DeltaLog;
import java.io.IOException;
Expand All @@ -43,7 +43,7 @@ private static DeltaLakeSink createMockSink(String location) {
DeltaLakeSinkFactoryTest.createMockTable(location);
Configuration conf = new Configuration();
DeltaLog log = DeltaLog.forTable(conf, location);
return new DeltaLakeSink(TableSchema.getMockTableSchema(), conf, log);
return new DeltaLakeSink(TestUtils.getMockTableSchema(), conf, log);
}

private void validateTableWithSpark(String location, List<Row> rows, StructType schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package com.risingwave.connector.sink.deltalake;

import com.risingwave.connector.DeltaLakeSinkFactory;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.TestUtils;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Operation;
import io.delta.standalone.OptimisticTransaction;
Expand Down Expand Up @@ -49,7 +49,7 @@ public static void createMockTable(String location) {
Configuration conf = new Configuration();
DeltaLog log = DeltaLog.forTable(conf, location);

// should be synchronized with `TableSchema.getMockTableSchema()`;
// should be synchronized with `TestUtils.getMockTableSchema()`;
StructType schema =
new StructType(
new StructField[] {
Expand All @@ -73,7 +73,7 @@ public void testCreate() throws IOException {
createMockTable(location);
DeltaLakeSinkFactory sinkFactory = new DeltaLakeSinkFactory();
sinkFactory.create(
TableSchema.getMockTableSchema(),
TestUtils.getMockTableSchema(),
new HashMap<>() {
{
put("location", String.format("file://%s", location));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.risingwave.connector.IcebergSink;
import com.risingwave.connector.IcebergSinkFactory;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.TestUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -64,7 +64,7 @@ public void testCreate() throws IOException {
IcebergSink sink =
(IcebergSink)
sinkFactory.create(
TableSchema.getMockTableSchema(),
TestUtils.getMockTableSchema(),
Map.of(
"type",
sinkMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import com.risingwave.connector.IcebergSink;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.TestUtils;
import com.risingwave.connector.api.sink.ArraySinkRow;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testSync() throws IOException {
TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
IcebergSink sink =
new IcebergSink(
TableSchema.getMockTableSchema(),
TestUtils.getMockTableSchema(),
hadoopCatalog,
hadoopCatalog.loadTable(tableIdentifier),
FileFormat.PARQUET);
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testWrite() throws IOException {
TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
IcebergSink sink =
new IcebergSink(
TableSchema.getMockTableSchema(),
TestUtils.getMockTableSchema(),
hadoopCatalog,
hadoopCatalog.loadTable(tableIdentifier),
FileFormat.PARQUET);
Expand Down Expand Up @@ -189,7 +189,7 @@ public void testDrop() throws IOException {
TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
IcebergSink sink =
new IcebergSink(
TableSchema.getMockTableSchema(),
TestUtils.getMockTableSchema(),
hadoopCatalog,
hadoopCatalog.loadTable(tableIdentifier),
FileFormat.PARQUET);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.ArraySinkRow;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -109,7 +108,7 @@ public void testSync() throws IOException {
TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
UpsertIcebergSink sink =
new UpsertIcebergSink(
TableSchema.getMockTableSchema(),
TestUtils.getMockTableSchema(),
hadoopCatalog,
hadoopCatalog.loadTable(tableIdentifier),
FileFormat.PARQUET);
Expand Down Expand Up @@ -152,7 +151,7 @@ public void testWrite() throws IOException {
TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
UpsertIcebergSink sink =
new UpsertIcebergSink(
TableSchema.getMockTableSchema(),
TestUtils.getMockTableSchema(),
hadoopCatalog,
hadoopCatalog.loadTable(tableIdentifier),
FileFormat.PARQUET);
Expand Down Expand Up @@ -198,7 +197,7 @@ public void testDrop() throws IOException {
TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
UpsertIcebergSink sink =
new UpsertIcebergSink(
TableSchema.getMockTableSchema(),
TestUtils.getMockTableSchema(),
hadoopCatalog,
hadoopCatalog.loadTable(tableIdentifier),
FileFormat.PARQUET);
Expand Down

0 comments on commit 833a753

Please sign in to comment.