Skip to content

Commit

Permalink
[Improve][Connector-V2] Optimize the code
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Jun 24, 2024
1 parent 244fe08 commit 31f52c5
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,7 @@ protected Column(

/** Returns a copy of the column with a replaced name. */
public abstract Column rename(String newColumnName);

/** Returns a copy of the column with a replaced sourceType. */
public abstract Column reSourceType(String sourceType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,9 @@ public Column rename(String newColumnName) {
defaultValue,
comment);
}

@Override
public Column reSourceType(String sourceType) {
throw new UnsupportedOperationException("Not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,22 @@ public Column rename(String newColumnName) {
bitLen,
longColumnLength);
}

@Override
public Column reSourceType(String newSourceType) {
return new PhysicalColumn(
name,
dataType,
columnLength,
scale,
nullable,
defaultValue,
comment,
newSourceType,
options,
isUnsigned,
isZeroFill,
bitLen,
longColumnLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@

import org.apache.seatunnel.api.table.catalog.TableIdentifier;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@ToString(callSuper = true)
public abstract class AlterTableColumnEvent extends AlterTableEvent {

@Getter @Setter protected String sourceColumnType;

public AlterTableColumnEvent(TableIdentifier tableIdentifier) {
super(tableIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,23 @@ public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
parser.runIfNotNull(
() -> {
Column column = columnDefinitionListener.getColumn();
org.apache.seatunnel.api.table.catalog.Column seatunnelColumn =
toSeatunnelColumn(column);
String sourceColumnType = getSourceColumnType(column);
seatunnelColumn = seatunnelColumn.reSourceType(sourceColumnType);
if (ctx.FIRST() != null) {
AlterTableAddColumnEvent alterTableAddColumnEvent =
AlterTableAddColumnEvent.addFirst(
tableIdentifier, toSeatunnelColumn(column));
alterTableAddColumnEvent.setSourceColumnType(getSourceColumnType(column));
AlterTableAddColumnEvent.addFirst(tableIdentifier, seatunnelColumn);
changes.add(alterTableAddColumnEvent);
} else if (ctx.AFTER() != null) {
String afterColumn = parser.parseName(ctx.uid(1));
AlterTableAddColumnEvent alterTableAddColumnEvent =
AlterTableAddColumnEvent.addAfter(
tableIdentifier, toSeatunnelColumn(column), afterColumn);
alterTableAddColumnEvent.setSourceColumnType(getSourceColumnType(column));
tableIdentifier, seatunnelColumn, afterColumn);
changes.add(alterTableAddColumnEvent);
} else {
AlterTableAddColumnEvent alterTableAddColumnEvent =
AlterTableAddColumnEvent.add(
tableIdentifier, toSeatunnelColumn(column));
alterTableAddColumnEvent.setSourceColumnType(getSourceColumnType(column));
AlterTableAddColumnEvent.add(tableIdentifier, seatunnelColumn);
changes.add(alterTableAddColumnEvent);
}
listeners.remove(columnDefinitionListener);
Expand Down Expand Up @@ -153,27 +152,25 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
parser.runIfNotNull(
() -> {
Column column = columnDefinitionListener.getColumn();
org.apache.seatunnel.api.table.catalog.Column seatunnelColumn =
toSeatunnelColumn(column);
String sourceColumnType = getSourceColumnType(column);
seatunnelColumn = seatunnelColumn.reSourceType(sourceColumnType);
if (ctx.FIRST() != null) {
AlterTableModifyColumnEvent alterTableModifyColumnEvent =
AlterTableModifyColumnEvent.modifyFirst(
tableIdentifier, toSeatunnelColumn(column));
alterTableModifyColumnEvent.setSourceColumnType(
getSourceColumnType(column));
tableIdentifier, seatunnelColumn);
changes.add(alterTableModifyColumnEvent);
} else if (ctx.AFTER() != null) {
String afterColumn = parser.parseName(ctx.uid(1));
AlterTableModifyColumnEvent alterTableModifyColumnEvent =
AlterTableModifyColumnEvent.modifyAfter(
tableIdentifier, toSeatunnelColumn(column), afterColumn);
alterTableModifyColumnEvent.setSourceColumnType(
getSourceColumnType(column));
tableIdentifier, seatunnelColumn, afterColumn);
changes.add(alterTableModifyColumnEvent);
} else {
AlterTableModifyColumnEvent alterTableModifyColumnEvent =
AlterTableModifyColumnEvent.modify(
tableIdentifier, toSeatunnelColumn(column));
alterTableModifyColumnEvent.setSourceColumnType(
getSourceColumnType(column));
tableIdentifier, seatunnelColumn);
changes.add(alterTableModifyColumnEvent);
}
listeners.remove(columnDefinitionListener);
Expand All @@ -199,16 +196,18 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
parser.runIfNotNull(
() -> {
Column column = columnDefinitionListener.getColumn();
org.apache.seatunnel.api.table.catalog.Column seatunnelColumn =
toSeatunnelColumn(column);
String sourceColumnType = getSourceColumnType(column);
seatunnelColumn = seatunnelColumn.reSourceType(sourceColumnType);
String oldColumnName = column.name();
String newColumnName = parser.parseName(ctx.newColumn);
Column newColumn = column.edit().name(newColumnName).create();
seatunnelColumn = seatunnelColumn.rename(newColumnName);
AlterTableChangeColumnEvent alterTableChangeColumnEvent =
AlterTableChangeColumnEvent.change(
tableIdentifier, oldColumnName, toSeatunnelColumn(newColumn));
tableIdentifier, oldColumnName, seatunnelColumn);
if (StringUtils.isNotBlank(newColumnName)
&& !StringUtils.equals(oldColumnName, newColumnName)) {
alterTableChangeColumnEvent.setSourceColumnType(
getSourceColumnType(newColumn));
changes.add(alterTableChangeColumnEvent);
}
listeners.remove(columnDefinitionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ default String generateAlterTableSql(
Column addColumn = ((AlterTableAddColumnEvent) event).getColumn();
return buildAlterTableSql(
sourceDialectName,
event.getSourceColumnType(),
addColumn.getSourceType(),
AlterType.ADD.name(),
addColumn,
tableIdentifierWithQuoted,
Expand All @@ -477,7 +477,7 @@ default String generateAlterTableSql(
String dropColumn = ((AlterTableDropColumnEvent) event).getColumn();
return buildAlterTableSql(
sourceDialectName,
event.getSourceColumnType(),
null,
AlterType.DROP.name(),
null,
tableIdentifierWithQuoted,
Expand All @@ -486,7 +486,7 @@ default String generateAlterTableSql(
Column modifyColumn = ((AlterTableModifyColumnEvent) event).getColumn();
return buildAlterTableSql(
sourceDialectName,
event.getSourceColumnType(),
modifyColumn.getSourceType(),
AlterType.MODIFY.name(),
modifyColumn,
tableIdentifierWithQuoted,
Expand All @@ -498,7 +498,7 @@ default String generateAlterTableSql(
String oldColumnName = alterTableChangeColumnEvent.getOldColumn();
return buildAlterTableSql(
sourceDialectName,
event.getSourceColumnType(),
changeColumn.getSourceType(),
AlterType.CHANGE.name(),
changeColumn,
tableIdentifierWithQuoted,
Expand Down

0 comments on commit 31f52c5

Please sign in to comment.