diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowMap.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowMap.java index 7bad8abea65ef..a817e715a45d3 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowMap.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowMap.java @@ -19,6 +19,7 @@ import io.grpc.Status; import java.util.List; import java.util.TreeMap; +import org.apache.iceberg.data.Record; public class SinkRowMap { TreeMap>, SinkRowOp> map = new TreeMap<>(new PkComparator()); @@ -27,7 +28,7 @@ public void clear() { map.clear(); } - public void insert(List> key, SinkRow row) { + public void insert(List> key, Record row) { if (!map.containsKey(key)) { map.put(key, SinkRowOp.insertOp(row)); } else { @@ -42,19 +43,20 @@ public void insert(List> key, SinkRow row) { } } - public void delete(List> key, SinkRow row) { + public void delete(List> key, Record row) { if (!map.containsKey(key)) { map.put(key, SinkRowOp.deleteOp(row)); } else { SinkRowOp sinkRowOp = map.get(key); - SinkRow insert = sinkRowOp.getInsert(); + Record insert = sinkRowOp.getInsert(); if (insert == null) { throw Status.FAILED_PRECONDITION .withDescription("try to double delete a primary key") .asRuntimeException(); } - assertRowValuesEqual(insert, row); - SinkRow delete = sinkRowOp.getDelete(); + // TODO: may enable it again + // assertRowValuesEqual(insert, row); + Record delete = sinkRowOp.getDelete(); if (delete != null) { map.put(key, SinkRowOp.deleteOp(delete)); } else { diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowOp.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowOp.java index 1ac28644a75b2..67b42b078914a 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowOp.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/SinkRowOp.java @@ -14,14 +14,14 @@ package com.risingwave.connector; -import com.risingwave.connector.api.sink.SinkRow; import io.grpc.Status; +import org.apache.iceberg.data.Record; public class SinkRowOp { - private final SinkRow delete; - private final SinkRow insert; + private final Record delete; + private final Record insert; - public static SinkRowOp insertOp(SinkRow row) { + public static SinkRowOp insertOp(Record row) { if (row == null) { throw Status.FAILED_PRECONDITION .withDescription("row op must not be null to initialize insertOp") @@ -30,7 +30,7 @@ public static SinkRowOp insertOp(SinkRow row) { return new SinkRowOp(null, row); } - public static SinkRowOp deleteOp(SinkRow row) { + public static SinkRowOp deleteOp(Record row) { if (row == null) { throw Status.FAILED_PRECONDITION .withDescription("row op must not be null to initialize deleteOp") @@ -39,7 +39,7 @@ public static SinkRowOp deleteOp(SinkRow row) { return new SinkRowOp(row, null); } - public static SinkRowOp updateOp(SinkRow delete, SinkRow insert) { + public static SinkRowOp updateOp(Record delete, Record insert) { if (delete == null || insert == null) { throw Status.FAILED_PRECONDITION .withDescription("row ops must not be null initialize updateOp") @@ -48,7 +48,7 @@ public static SinkRowOp updateOp(SinkRow delete, SinkRow insert) { return new SinkRowOp(delete, insert); } - private SinkRowOp(SinkRow delete, SinkRow insert) { + private SinkRowOp(Record delete, Record insert) { this.delete = delete; this.insert = insert; } @@ -57,11 +57,11 @@ public boolean isDelete() { return insert == null && delete != null; } - public SinkRow getDelete() { + public Record getDelete() { return delete; } - public SinkRow getInsert() { + public Record getInsert() { return insert; } } diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSink.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSink.java index a0522602a4c32..8c26f7b3659e2 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSink.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSink.java @@ -75,7 +75,7 @@ public UpsertIcebergSink( .collect(Collectors.toList()); } - private Record newRecord(Schema schema, SinkRow row) { + private static Record newRecord(Schema schema, SinkRow row) { Record record = GenericRecord.create(schema); for (int i = 0; i < schema.columns().size(); i++) { record.set(i, row.get(i)); @@ -174,10 +174,10 @@ public void write(Iterator rows) { } switch (row.getOp()) { case INSERT: - sinkRowMap.insert(getKeyFromRow(row), row); + sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row)); break; case DELETE: - sinkRowMap.delete(getKeyFromRow(row), row); + sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row)); break; case UPDATE_DELETE: if (updateBufferExists) { @@ -186,7 +186,7 @@ public void write(Iterator rows) { "an UPDATE_INSERT should precede an UPDATE_DELETE") .asRuntimeException(); } - sinkRowMap.delete(getKeyFromRow(row), row); + sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row)); updateBufferExists = true; break; case UPDATE_INSERT: @@ -196,7 +196,7 @@ public void write(Iterator rows) { "an UPDATE_INSERT should precede an UPDATE_DELETE") .asRuntimeException(); } - sinkRowMap.insert(getKeyFromRow(row), row); + sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row)); updateBufferExists = false; break; default: @@ -217,13 +217,13 @@ public void sync() { newEqualityDeleteWriter(entry.getKey()); DataWriter dataWriter = newDataWriter(entry.getKey()); for (SinkRowOp sinkRowOp : entry.getValue().map.values()) { - SinkRow insert = sinkRowOp.getInsert(); - SinkRow delete = sinkRowOp.getDelete(); + Record insert = sinkRowOp.getInsert(); + Record delete = sinkRowOp.getDelete(); if (insert != null) { - dataWriter.write(newRecord(rowSchema, insert)); + dataWriter.write(insert); } if (delete != null) { - equalityDeleteWriter.write(newRecord(deleteRowSchema, delete)); + equalityDeleteWriter.write(delete); } } try { diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/SinkRowMapTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/SinkRowMapTest.java index f4cdd8d988c71..c5048aca40762 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/SinkRowMapTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/SinkRowMapTest.java @@ -21,6 +21,10 @@ import com.risingwave.proto.Data; import java.util.ArrayList; import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; @@ -31,29 +35,42 @@ public void testInsert() { SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); + Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); + Record r = GenericRecord.create(schema); + r.set(0, row.get(0)); - sinkRowMap.insert(key, row); + sinkRowMap.insert(key, r); assertEquals(1, sinkRowMap.map.size()); assertEquals(null, sinkRowMap.map.get(key).getDelete()); - assertEquals(row, sinkRowMap.map.get(key).getInsert()); + assertEquals(r, sinkRowMap.map.get(key).getInsert()); } @Test public void testInsertAfterDelete() { SinkRowMap sinkRowMap = new SinkRowMap(); + Schema schema = + new Schema( + Types.NestedField.optional(0, "id", Types.IntegerType.get()), + Types.NestedField.optional(1, "name", Types.StringType.get())); SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice"); List> key1 = new ArrayList<>(); key1.add((Comparable) row1.get(0)); + Record r1 = GenericRecord.create(schema); + r1.set(0, row1.get(0)); + r1.set(1, row1.get(1)); SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Bob"); List> key2 = new ArrayList<>(); key2.add((Comparable) row2.get(0)); + Record r2 = GenericRecord.create(schema); + r2.set(0, row2.get(0)); + r2.set(1, row2.get(1)); - sinkRowMap.delete(key1, row1); - sinkRowMap.insert(key1, row2); + sinkRowMap.delete(key1, r1); + sinkRowMap.insert(key1, r2); assertEquals(1, sinkRowMap.map.size()); - assertEquals(row1, sinkRowMap.map.get(key1).getDelete()); - assertEquals(row2, sinkRowMap.map.get(key1).getInsert()); + assertEquals(r1, sinkRowMap.map.get(key1).getDelete()); + assertEquals(r2, sinkRowMap.map.get(key1).getInsert()); } @Test @@ -62,11 +79,14 @@ public void testInsertAfterInsert() { SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); + Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); + Record r = GenericRecord.create(schema); + r.set(0, row.get(0)); - sinkRowMap.insert(key, row); + sinkRowMap.insert(key, r); boolean exceptionThrown = false; try { - sinkRowMap.insert(key, row); + sinkRowMap.insert(key, r); } catch (RuntimeException e) { exceptionThrown = true; Assert.assertTrue( @@ -87,10 +107,14 @@ public void testDelete() { List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); - sinkRowMap.delete(key, row); + Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); + Record r = GenericRecord.create(schema); + r.set(0, row.get(0)); + + sinkRowMap.delete(key, r); assertEquals(1, sinkRowMap.map.size()); assertEquals(null, sinkRowMap.map.get(key).getInsert()); - assertEquals(row, sinkRowMap.map.get(key).getDelete()); + assertEquals(r, sinkRowMap.map.get(key).getDelete()); } @Test @@ -100,10 +124,14 @@ public void testDeleteAfterDelete() { List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); - sinkRowMap.delete(key, row); + Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); + Record r = GenericRecord.create(schema); + r.set(0, row.get(0)); + + sinkRowMap.delete(key, r); boolean exceptionThrown = false; try { - sinkRowMap.delete(key, row); + sinkRowMap.delete(key, r); } catch (RuntimeException e) { exceptionThrown = true; Assert.assertTrue( @@ -122,8 +150,12 @@ public void testDeleteAfterInsert() { List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); - sinkRowMap.insert(key, row); - sinkRowMap.delete(key, row); + Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); + Record r = GenericRecord.create(schema); + r.set(0, row.get(0)); + + sinkRowMap.insert(key, r); + sinkRowMap.delete(key, r); assertEquals(0, sinkRowMap.map.size()); } @@ -131,19 +163,31 @@ public void testDeleteAfterInsert() { public void testDeleteAfterUpdate() { SinkRowMap sinkRowMap = new SinkRowMap(); + Schema schema = + new Schema( + Types.NestedField.optional(0, "id", Types.IntegerType.get()), + Types.NestedField.optional(1, "name", Types.StringType.get())); + SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice"); List> key1 = new ArrayList<>(); key1.add((Comparable) row1.get(0)); + Record r1 = GenericRecord.create(schema); + r1.set(0, row1.get(0)); + r1.set(1, row1.get(1)); + SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Clare"); List> key2 = new ArrayList<>(); key2.add((Comparable) row2.get(0)); + Record r2 = GenericRecord.create(schema); + r2.set(0, row2.get(0)); + r2.set(1, row2.get(1)); - sinkRowMap.delete(key1, row1); - sinkRowMap.insert(key2, row2); - sinkRowMap.delete(key2, row2); + sinkRowMap.delete(key1, r1); + sinkRowMap.insert(key2, r2); + sinkRowMap.delete(key2, r2); assertEquals(1, sinkRowMap.map.size()); assertEquals(null, sinkRowMap.map.get(key1).getInsert()); - assertEquals(row1, sinkRowMap.map.get(key1).getDelete()); + assertEquals(r1, sinkRowMap.map.get(key1).getDelete()); } @Test @@ -153,7 +197,10 @@ public void testClear() { SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); - sinkRowMap.insert(key, row); + Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get())); + Record r = GenericRecord.create(schema); + r.set(0, row.get(0)); + sinkRowMap.insert(key, r); sinkRowMap.clear(); assertEquals(0, sinkRowMap.map.size());