Skip to content

Commit

Permalink
[Improve] Add SaveMode log of process detail (apache#6375)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored and chaorongzhi committed Aug 21, 2024
1 parent 2d9a5a8 commit c5feee8
Show file tree
Hide file tree
Showing 26 changed files with 1,331 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,26 @@
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Optional;

import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SINK_TABLE_NOT_EXIST;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SOURCE_ALREADY_HAS_DATA;

@AllArgsConstructor
@Slf4j
public class DefaultSaveModeHandler implements SaveModeHandler {

public SchemaSaveMode schemaSaveMode;
public DataSaveMode dataSaveMode;
public Catalog catalog;
public TablePath tablePath;
public CatalogTable catalogTable;
public String customSql;
@Nonnull public SchemaSaveMode schemaSaveMode;
@Nonnull public DataSaveMode dataSaveMode;
@Nonnull public Catalog catalog;
@Nonnull public TablePath tablePath;
@Nullable public CatalogTable catalogTable;
@Nullable public String customSql;

public DefaultSaveModeHandler(
SchemaSaveMode schemaSaveMode,
Expand Down Expand Up @@ -132,17 +139,58 @@ protected boolean tableExists() {
}

protected void dropTable() {
try {
log.info(
"Dropping table {} with action {}",
tablePath,
catalog.previewAction(
Catalog.ActionType.DROP_TABLE, tablePath, Optional.empty()));
} catch (UnsupportedOperationException ignore) {
log.info("Dropping table {}", tablePath);
}
catalog.dropTable(tablePath, true);
}

protected void createTable() {
if (!catalog.databaseExists(tablePath.getDatabaseName())) {
catalog.createDatabase(TablePath.of(tablePath.getDatabaseName(), ""), true);
TablePath databasePath = TablePath.of(tablePath.getDatabaseName(), "");
try {
log.info(
"Creating database {} with action {}",
tablePath.getDatabaseName(),
catalog.previewAction(
Catalog.ActionType.CREATE_DATABASE,
databasePath,
Optional.empty()));
} catch (UnsupportedOperationException ignore) {
log.info("Creating database {}", tablePath.getDatabaseName());
}
catalog.createDatabase(databasePath, true);
}
try {
log.info(
"Creating table {} with action {}",
tablePath,
catalog.previewAction(
Catalog.ActionType.CREATE_TABLE,
tablePath,
Optional.ofNullable(catalogTable)));
} catch (UnsupportedOperationException ignore) {
log.info("Creating table {}", tablePath);
}
catalog.createTable(tablePath, catalogTable, true);
}

protected void truncateTable() {
try {
log.info(
"Truncating table {} with action {}",
tablePath,
catalog.previewAction(
Catalog.ActionType.TRUNCATE_TABLE, tablePath, Optional.empty()));
} catch (UnsupportedOperationException ignore) {
log.info("Truncating table {}", tablePath);
}
catalog.truncateTable(tablePath, true);
}

Expand All @@ -151,9 +199,30 @@ protected boolean dataExists() {
}

protected void executeCustomSql() {
log.info("Executing custom SQL for table {} with SQL: {}", tablePath, customSql);
catalog.executeSql(tablePath, customSql);
}

@Override
public TablePath getHandleTablePath() {
return tablePath;
}

@Override
public Catalog getHandleCatalog() {
return catalog;
}

@Override
public SchemaSaveMode getSchemaSaveMode() {
return schemaSaveMode;
}

@Override
public DataSaveMode getDataSaveMode() {
return dataSaveMode;
}

@Override
public void close() throws Exception {
catalog.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SaveModeExecuteWrapper {

public SaveModeExecuteWrapper(SaveModeHandler handler) {
this.handler = handler;
}

public void execute() {
log.info(
"Executing save mode for table: {}, with SchemaSaveMode: {}, DataSaveMode: {} using Catalog: {}",
handler.getHandleTablePath(),
handler.getSchemaSaveMode(),
handler.getDataSaveMode(),
handler.getHandleCatalog().name());
handler.handleSaveMode();
}

private final SaveModeHandler handler;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,23 @@

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.TablePath;

public interface SaveModeHandler extends AutoCloseable {

void handleSchemaSaveMode();

void handleDataSaveMode();

SchemaSaveMode getSchemaSaveMode();

DataSaveMode getDataSaveMode();

TablePath getHandleTablePath();

Catalog getHandleCatalog();

default void handleSaveMode() {
// handleSchemaSaveMode();
// handleDataSaveMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,19 @@ default boolean isExistsData(TablePath tablePath) {

default void executeSql(TablePath tablePath, String sql) {}

default PreviewResult previewAction(
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
throw new UnsupportedOperationException("Preview action is not supported");
}

enum ActionType {
CREATE_TABLE,
CREATE_DATABASE,
DROP_TABLE,
DROP_DATABASE,
TRUNCATE_TABLE
}

// todo: Support for update table metadata

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.catalog;

public class InfoPreviewResult extends PreviewResult {
private final String info;

public String getInfo() {
return info;
}

public InfoPreviewResult(String info) {
super(Type.INFO);
this.info = info;
}

@Override
public String toString() {
return info;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.catalog;

/** The result of a SQL preview action in {@link Catalog#previewAction}. */
public abstract class PreviewResult {

private final Type type;

public PreviewResult(Type type) {
this.type = type;
}

public Type getType() {
return type;
}

public enum Type {
SQL,
INFO,
OTHER
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.catalog;

public class SQLPreviewResult extends PreviewResult {

private final String sql;

public String getSql() {
return sql;
}

public SQLPreviewResult(String sql) {
super(Type.SQL);
this.sql = sql;
}

@Override
public String toString() {
return sql;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PreviewResult;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
Expand Down Expand Up @@ -49,6 +51,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;

public class DorisCatalog implements Catalog {

Expand Down Expand Up @@ -339,8 +344,7 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try {
if (ignoreIfNotExists) {
conn.createStatement()
.execute(String.format("TRUNCATE TABLE %s", tablePath.getFullName()));
conn.createStatement().execute(DorisCatalogUtil.getTruncateTableQuery(tablePath));
}
} catch (Exception e) {
throw new CatalogException(
Expand All @@ -359,4 +363,27 @@ public boolean isExistsData(TablePath tablePath) {
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
}
}

@Override
public PreviewResult previewAction(
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
if (actionType == ActionType.CREATE_TABLE) {
checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null");
return new SQLPreviewResult(
DorisCatalogUtil.getCreateTableStatement(
dorisConfig.getCreateTableTemplate(), tablePath, catalogTable.get()));
} else if (actionType == ActionType.DROP_TABLE) {
return new SQLPreviewResult(DorisCatalogUtil.getDropTableQuery(tablePath, true));
} else if (actionType == ActionType.TRUNCATE_TABLE) {
return new SQLPreviewResult(DorisCatalogUtil.getTruncateTableQuery(tablePath));
} else if (actionType == ActionType.CREATE_DATABASE) {
return new SQLPreviewResult(
DorisCatalogUtil.getCreateDatabaseQuery(tablePath.getDatabaseName(), true));
} else if (actionType == ActionType.DROP_DATABASE) {
return new SQLPreviewResult(
DorisCatalogUtil.getDropDatabaseQuery(tablePath.getDatabaseName(), true));
} else {
throw new UnsupportedOperationException("Unsupported action type: " + actionType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public static String getDropTableQuery(TablePath tablePath, boolean ignoreIfNotE
return "DROP TABLE " + (ignoreIfNotExists ? "IF EXISTS " : "") + tablePath.getFullName();
}

public static String getTruncateTableQuery(TablePath tablePath) {
return "TRUNCATE TABLE " + tablePath.getFullName();
}

/**
* @param createTableTemplate create table template
* @param catalogTable catalog table
Expand Down
Loading

0 comments on commit c5feee8

Please sign in to comment.