Skip to content

Commit

Permalink
[Feature][Mongodb-CDC] support multi-table read
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Nov 27, 2024
1 parent 1762486 commit 40af7e4
Showing 1 changed file with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand Down Expand Up @@ -189,11 +188,10 @@ public PreparedStatement toExternal(
throws SQLException {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
Object fieldValue = null;
try {
SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
int statementIndex = fieldIndex + 1;
fieldValue = row.getField(fieldIndex);
Object fieldValue = row.getField(fieldIndex);
if (fieldValue == null) {
statement.setObject(statementIndex, null);
continue;
Expand Down Expand Up @@ -230,27 +228,28 @@ public PreparedStatement toExternal(
break;
case DATE:
LocalDate localDate = (LocalDate) row.getField(fieldIndex);
statement.setDate(statementIndex, Date.valueOf(localDate));
statement.setDate(statementIndex, java.sql.Date.valueOf(localDate));
break;
case TIME:
writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex));
break;
case TIMESTAMP:
LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);
statement.setTimestamp(statementIndex, Timestamp.valueOf(localDateTime));
statement.setTimestamp(
statementIndex, java.sql.Timestamp.valueOf(localDateTime));
break;
case BYTES:
statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex));
break;
case NULL:
statement.setNull(statementIndex, Types.NULL);
statement.setNull(statementIndex, java.sql.Types.NULL);
break;
case ARRAY:
SeaTunnelDataType elementType =
((ArrayType) seaTunnelDataType).getElementType();
Object[] array = (Object[]) row.getField(fieldIndex);
if (array == null) {
statement.setNull(statementIndex, Types.ARRAY);
statement.setNull(statementIndex, java.sql.Types.ARRAY);
break;
}
if (SqlType.TINYINT.equals(elementType.getSqlType())) {
Expand All @@ -273,10 +272,7 @@ public PreparedStatement toExternal(
} catch (Exception e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED,
"error field:"
+ rowType.getFieldNames()[fieldIndex]
+ "error value:"
+ fieldValue,
"error field:" + rowType.getFieldNames()[fieldIndex],
e);
}
}
Expand All @@ -285,6 +281,6 @@ public PreparedStatement toExternal(

protected void writeTime(PreparedStatement statement, int index, LocalTime time)
throws SQLException {
statement.setTime(index, Time.valueOf(time));
statement.setTime(index, java.sql.Time.valueOf(time));
}
}

0 comments on commit 40af7e4

Please sign in to comment.