From b2c742f059b1c7f3d455054a1c8f76b35390c8f7 Mon Sep 17 00:00:00 2001
From: huanghaibin <284824253@qq.com>
Date: Thu, 19 Sep 2024 21:07:15 +0800
Subject: [PATCH] [fix](cloud-mow) Add retry when calculating delete bitmap
timeout when loading data (#40562)
Add retry when calculating delete bitmap timeout on broker load , like
stream load doing.
---
.../cloud_engine_calc_delete_bitmap_task.cpp | 1 +
.../java/org/apache/doris/common/Config.java | 2 +-
.../CloudGlobalTransactionMgr.java | 23 +-
.../doris/common/util/MetaLockUtils.java | 10 +-
.../doris/load/loadv2/BrokerLoadJob.java | 88 +++---
.../doris/load/loadv2/SparkLoadJob.java | 61 +++--
.../insert/AbstractInsertExecutor.java | 21 +-
.../test_cloud_mow_broker_load_with_retry.out | 7 +
.../test_cloud_mow_insert_with_retry.out | 15 ++
...st_cloud_mow_broker_load_with_retry.groovy | 251 ++++++++++++++++++
.../test_cloud_mow_insert_timeout.groovy | 2 +-
.../test_cloud_mow_insert_with_retry.groovy | 86 ++++++
12 files changed, 492 insertions(+), 75 deletions(-)
create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out
create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out
create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy
create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 33b9e51c7cbcf2..0aa40ed67ebda4 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -95,6 +95,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
}
// wait for all finished
token->wait();
+ DBUG_EXECUTE_IF("CloudEngineCalcDeleteBitmapTask.execute.enable_wait", { sleep(3); });
LOG(INFO) << "finish to calculate delete bitmap on transaction."
<< "transaction_id=" << transaction_id << ", cost(us): " << watch.get_elapse_time_us()
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 71bb0e396f10d9..4bc093ebcd691c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2964,7 +2964,7 @@ public static int metaServiceRpcRetryTimes() {
public static String security_checker_class_name = "";
@ConfField(mutable = true)
- public static int mow_insert_into_commit_retry_times = 10;
+ public static int mow_calculate_delete_bitmap_retry_times = 10;
@ConfField(mutable = true, description = {"指定S3 Load endpoint白名单, 举例: s3_load_endpoint_white_list=a,b,c",
"the white list for the s3 load endpoint, if it is empty, no white list will be set,"
diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index e52a4c62957600..e64188b7c22e82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -927,7 +927,28 @@ private void debugCalcDeleteBitmapRandomTimeout() throws UserException {
public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, long transactionId,
List tabletCommitInfos, long timeoutMillis)
throws UserException {
- return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null);
+ int retryTimes = 0;
+ boolean res = false;
+ while (true) {
+ try {
+ res = commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null);
+ break;
+ } catch (UserException e) {
+ LOG.warn("failed to commit txn, txnId={},retryTimes={},exception={}",
+ transactionId, retryTimes, e);
+ // only mow table will catch DELETE_BITMAP_LOCK_ERR and need to retry
+ if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
+ retryTimes++;
+ if (retryTimes >= Config.mow_calculate_delete_bitmap_retry_times) {
+ // should throw exception after running out of retry times
+ throw e;
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+ return res;
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
index 16afbcecdaea5c..ffd411d0cf3ea1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
@@ -127,8 +127,14 @@ public static void writeUnlockTables(List extends TableIf> tableList) {
}
public static void commitLockTables(List tableList) {
- for (Table table : tableList) {
- table.commitLock();
+ for (int i = 0; i < tableList.size(); i++) {
+ try {
+ tableList.get(i).commitLock();
+ } catch (Exception e) {
+ for (int j = i - 1; j >= 0; j--) {
+ tableList.get(i).commitUnlock();
+ }
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index fb5f06fced570b..5e1b085b239474 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -29,6 +29,7 @@
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.QuotaExceedException;
@@ -335,42 +336,59 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
}
Database db = null;
List tableList = null;
- try {
- db = getDb();
- tableList = db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(fileGroupAggInfo.getAllTableIds()));
- if (Config.isCloudMode()) {
- MetaLockUtils.commitLockTables(tableList);
- } else {
- MetaLockUtils.writeLockTablesOrMetaException(tableList);
+ int retryTimes = 0;
+ while (true) {
+ try {
+ db = getDb();
+ tableList = db.getTablesOnIdOrderOrThrowException(
+ Lists.newArrayList(fileGroupAggInfo.getAllTableIds()));
+ if (Config.isCloudMode()) {
+ MetaLockUtils.commitLockTables(tableList);
+ } else {
+ MetaLockUtils.writeLockTablesOrMetaException(tableList);
+ }
+ } catch (MetaNotFoundException e) {
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("database_id", dbId)
+ .add("error_msg", "db has been deleted when job is loading")
+ .build(), e);
+ cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
+ return;
}
- } catch (MetaNotFoundException e) {
- LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("database_id", dbId)
- .add("error_msg", "db has been deleted when job is loading")
- .build(), e);
- cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
- return;
- }
- try {
- LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("txn_id", transactionId)
- .add("msg", "Load job try to commit txn")
- .build());
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
- dbId, tableList, transactionId, commitInfos, getLoadJobFinalOperation());
- afterLoadingTaskCommitTransaction(tableList);
- afterCommit();
- } catch (UserException e) {
- LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("database_id", dbId)
- .add("error_msg", "Failed to commit txn with error:" + e.getMessage())
- .build(), e);
- cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
- } finally {
- if (Config.isCloudMode()) {
- MetaLockUtils.commitUnlockTables(tableList);
- } else {
- MetaLockUtils.writeUnlockTables(tableList);
+ try {
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("txn_id", transactionId)
+ .add("msg", "Load job try to commit txn")
+ .build());
+ Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ dbId, tableList, transactionId, commitInfos, getLoadJobFinalOperation());
+ afterLoadingTaskCommitTransaction(tableList);
+ afterCommit();
+ return;
+ } catch (UserException e) {
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("database_id", dbId)
+ .add("retry_times", retryTimes)
+ .add("error_msg", "Failed to commit txn with error:" + e.getMessage())
+ .build(), e);
+ if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
+ retryTimes++;
+ if (retryTimes >= Config.mow_calculate_delete_bitmap_retry_times) {
+ LOG.warn("cancelJob {} because up to max retry time,exception {}", id, e);
+ cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true,
+ true);
+ return;
+ }
+ } else {
+ cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
+ return;
+ }
+ } finally {
+ if (Config.isCloudMode()) {
+ MetaLockUtils.commitUnlockTables(tableList);
+ } else {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 1f1c71d7a903d2..f01f205e96dc0d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -45,9 +45,11 @@
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
@@ -656,21 +658,50 @@ public void updateLoadingStatus() throws UserException {
}
private void tryCommitJob() throws UserException {
- LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", transactionId)
- .add("msg", "Load job try to commit txn").build());
- Database db = getDb();
- List tableList = db.getTablesOnIdOrderOrThrowException(
- Lists.newArrayList(tableToLoadPartitions.keySet()));
- MetaLockUtils.writeLockTablesOrMetaException(tableList);
- try {
- Env.getCurrentGlobalTransactionMgr().commitTransaction(
- dbId, tableList, transactionId, commitInfos,
- new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
- finishTimestamp, state, failMsg));
- } catch (TabletQuorumFailedException e) {
- // retry in next loop
- } finally {
- MetaLockUtils.writeUnlockTables(tableList);
+ int retryTimes = 0;
+ while (true) {
+ Database db = getDb();
+ List tableList = db.getTablesOnIdOrderOrThrowException(
+ Lists.newArrayList(tableToLoadPartitions.keySet()));
+ if (Config.isCloudMode()) {
+ MetaLockUtils.commitLockTables(tableList);
+ } else {
+ MetaLockUtils.writeLockTablesOrMetaException(tableList);
+ }
+ try {
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", transactionId)
+ .add("msg", "Load job try to commit txn").build());
+ Env.getCurrentGlobalTransactionMgr().commitTransaction(
+ dbId, tableList, transactionId, commitInfos,
+ new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
+ finishTimestamp, state, failMsg));
+ return;
+ } catch (TabletQuorumFailedException e) {
+ // retry in next loop
+ return;
+ } catch (UserException e) {
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("txn_id", transactionId)
+ .add("database_id", dbId)
+ .add("retry_times", retryTimes)
+ .add("error_msg", "Failed to commit txn with error:" + e.getMessage())
+ .build(), e);
+ if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
+ retryTimes++;
+ if (retryTimes >= Config.mow_calculate_delete_bitmap_retry_times) {
+ LOG.warn("cancelJob {} because up to max retry time, exception {}", id, e);
+ throw e;
+ }
+ } else {
+ throw e;
+ }
+ } finally {
+ if (Config.isCloudMode()) {
+ MetaLockUtils.commitUnlockTables(tableList);
+ } else {
+ MetaLockUtils.writeUnlockTables(tableList);
+ }
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index cdf74f5e9aca3a..cafffab295eba1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -24,7 +24,6 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
@@ -193,25 +192,7 @@ public void executeSingleInsert(StmtExecutor executor, long jobId) throws Except
executor.updateProfile(false);
execImpl(executor, jobId);
checkStrictModeAndFilterRatio();
- int retryTimes = 0;
- while (true) {
- try {
- onComplete();
- break;
- } catch (UserException e) {
- LOG.warn("failed to commit txn, txnId={}, jobId={}, retryTimes={}",
- getTxnId(), jobId, retryTimes, e);
- if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
- retryTimes++;
- if (retryTimes >= Config.mow_insert_into_commit_retry_times) {
- // should throw exception after running out of retry times
- throw e;
- }
- } else {
- throw e;
- }
- }
- }
+ onComplete();
} catch (Throwable t) {
onFail(t);
// retry insert into from select when meet E-230 in cloud
diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out
new file mode 100644
index 00000000000000..9369fd5ae32f4f
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !select --
+19
+
+-- !select --
+19
+
diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out
new file mode 100644
index 00000000000000..979483692d31dc
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !sql --
+1 1 1
+2 2 2
+
+-- !sql --
+1 1 1
+
+-- !sql --
+1 1 1
+2 2 2
+
+-- !sql --
+1 1 1
+
diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy
new file mode 100644
index 00000000000000..035a6307d46e20
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_broker_load_with_retry", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ def s3BucketName = getS3BucketName()
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def customFeConfig = [
+ calculate_delete_bitmap_task_timeout_seconds: 2
+ ]
+
+ def table = "tbl_basic"
+ setFeConfigTemporary(customFeConfig) {
+
+ def attributesList = [
+
+ ]
+
+ /* ========================================================== normal ========================================================== */
+ attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv",
+ "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY \"|\"", "FORMAT AS \"CSV\"", "(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18)",
+ "", "", "", "", ""))
+
+ attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv",
+ "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY \"|\"", "FORMAT AS \"CSV\"", "(K00,K01,K02,K03,K04,K05,K06,K07,K08,K09,K10,K11,K12,K13,K14,K15,K16,K17,K18)",
+ "", "", "", "", ""))
+ def ak = getS3AK()
+ def sk = getS3SK()
+ try {
+ sql """ DROP TABLE IF EXISTS ${table} """
+ sql """
+ CREATE TABLE ${table}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL,
+ kd01 BOOLEAN NOT NULL DEFAULT "TRUE",
+ kd02 TINYINT NOT NULL DEFAULT "1",
+ kd03 SMALLINT NOT NULL DEFAULT "2",
+ kd04 INT NOT NULL DEFAULT "3",
+ kd05 BIGINT NOT NULL DEFAULT "4",
+ kd06 LARGEINT NOT NULL DEFAULT "5",
+ kd07 FLOAT NOT NULL DEFAULT "6.0",
+ kd08 DOUBLE NOT NULL DEFAULT "7.0",
+ kd09 DECIMAL NOT NULL DEFAULT "888888888",
+ kd10 DECIMALV3 NOT NULL DEFAULT "999999999",
+ kd11 DATE NOT NULL DEFAULT "2023-08-24",
+ kd12 DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ kd13 DATEV2 NOT NULL DEFAULT "2023-08-24",
+ kd14 DATETIMEV2 NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd18 JSON NULL,
+
+ INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+ INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+ INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+ INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+ INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"),
+ INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+ INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+ INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+
+ )
+ UNIQUE KEY(k00)
+ DISTRIBUTED BY HASH(k00) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+ );
+ """
+ GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ def i = 0
+ for (LoadAttributes attributes : attributesList) {
+ def label = "test_s3_load_" + UUID.randomUUID().toString().replace("-", "_") + "_" + i
+ attributes.label = label
+ def prop = attributes.getPropertiesStr()
+
+ def sql_str = """
+ LOAD LABEL $label (
+ $attributes.dataDesc.mergeType
+ DATA INFILE("$attributes.dataDesc.path")
+ INTO TABLE $attributes.dataDesc.tableName
+ $attributes.dataDesc.columnTermClause
+ $attributes.dataDesc.lineTermClause
+ $attributes.dataDesc.formatClause
+ $attributes.dataDesc.columns
+ $attributes.dataDesc.columnsFromPathClause
+ $attributes.dataDesc.columnMappingClause
+ $attributes.dataDesc.precedingFilterClause
+ $attributes.dataDesc.orderByClause
+ $attributes.dataDesc.whereExpr
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "$ak",
+ "AWS_SECRET_KEY" = "$sk",
+ "AWS_ENDPOINT" = "${s3Endpoint}",
+ "AWS_REGION" = "${s3Region}",
+ "use_path_style" = "$attributes.usePathStyle",
+ "provider" = "${getS3Provider()}"
+ )
+ ${prop}
+ """
+ logger.info("submit sql: ${sql_str}");
+ sql """${sql_str}"""
+ logger.info("Submit load with lable: $label, table: $attributes.dataDesc.tableName, path: $attributes.dataDesc.path")
+
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ String[][] result = sql """ show load where label="$attributes.label" order by createtime desc limit 1; """
+ if (result[0][2].equals("FINISHED")) {
+ if (attributes.isExceptFailed) {
+ assertTrue(false, "load should be failed but was success: $result")
+ }
+ logger.info("Load FINISHED " + attributes.label + ": $result")
+ break
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ if (attributes.isExceptFailed) {
+ logger.info("Load FINISHED " + attributes.label)
+ break
+ }
+ assertTrue(false, "load failed: $result")
+ break
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if (max_try_milli_secs <= 0) {
+ assertTrue(false, "load Timeout: $attributes.label")
+ }
+ }
+ qt_select """ select count(*) from $attributes.dataDesc.tableName """
+ ++i
+ }
+ } finally {
+ GetDebugPoint().disableDebugPointForAllFEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ sql "DROP TABLE IF EXISTS ${table};"
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+
+}
+
+class DataDesc {
+ public String mergeType = ""
+ public String path
+ public String tableName
+ public String lineTermClause
+ public String columnTermClause
+ public String formatClause
+ public String columns
+ public String columnsFromPathClause
+ public String precedingFilterClause
+ public String columnMappingClause
+ public String whereExpr
+ public String orderByClause
+}
+
+class LoadAttributes {
+ LoadAttributes(String path, String tableName, String lineTermClause, String columnTermClause, String formatClause,
+ String columns, String columnsFromPathClause, String precedingFilterClause, String columnMappingClause, String whereExpr, String orderByClause, boolean isExceptFailed = false) {
+ this.dataDesc = new DataDesc()
+ this.dataDesc.path = path
+ this.dataDesc.tableName = tableName
+ this.dataDesc.lineTermClause = lineTermClause
+ this.dataDesc.columnTermClause = columnTermClause
+ this.dataDesc.formatClause = formatClause
+ this.dataDesc.columns = columns
+ this.dataDesc.columnsFromPathClause = columnsFromPathClause
+ this.dataDesc.precedingFilterClause = precedingFilterClause
+ this.dataDesc.columnMappingClause = columnMappingClause
+ this.dataDesc.whereExpr = whereExpr
+ this.dataDesc.orderByClause = orderByClause
+
+ this.isExceptFailed = isExceptFailed
+
+ properties = new HashMap<>()
+ }
+
+ LoadAttributes addProperties(String k, String v) {
+ properties.put(k, v)
+ return this
+ }
+
+ String getPropertiesStr() {
+ if (properties.isEmpty()) {
+ return ""
+ }
+ String prop = "PROPERTIES ("
+ properties.forEach (k, v) -> {
+ prop += "\"${k}\" = \"${v}\","
+ }
+ prop = prop.substring(0, prop.size() - 1)
+ prop += ")"
+ return prop
+ }
+
+ LoadAttributes withPathStyle() {
+ usePathStyle = "true"
+ return this
+ }
+
+ public DataDesc dataDesc
+ public Map properties
+ public String label
+ public String usePathStyle = "false"
+ public boolean isExceptFailed
+}
\ No newline at end of file
diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
index 23d92f31e5ad8e..7baf18c772290f 100644
--- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
+++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
@@ -50,7 +50,7 @@ suite("test_cloud_mow_insert_timeout", "nonConcurrent") {
def customFeConfig = [
delete_bitmap_lock_expiration_seconds : 5,
calculate_delete_bitmap_task_timeout_seconds : 2,
- mow_insert_into_commit_retry_times : 2
+ mow_calculate_delete_bitmap_retry_times : 2
]
setFeConfigTemporary(customFeConfig) {
diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy
new file mode 100644
index 00000000000000..f7038b80e426fc
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_insert_with_retry", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def customFeConfig = [
+ calculate_delete_bitmap_task_timeout_seconds: 2
+ ]
+ def dbName = "regression_test_fault_injection_p0_cloud"
+ def table1 = dbName + ".test_cloud_mow_insert_with_retry"
+ setFeConfigTemporary(customFeConfig) {
+ for (item in ["legacy", "nereids"]) {
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+ connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl + "&useLocalSessionState=true") {
+ if (item == "nereids") {
+ sql """ set enable_nereids_planner=true; """
+ sql """ set enable_fallback_to_original_planner=false; """
+ } else {
+ sql """ set enable_nereids_planner = false; """
+ }
+ def timeout = 2000
+ def now = System.currentTimeMillis()
+ sql "insert into ${table1} values(1,1,1);"
+ def time_diff = System.currentTimeMillis() - now
+ logger.info("time_diff:" + time_diff)
+ assertTrue(time_diff > timeout, "insert or delete should take over " + timeout + " ms")
+
+ now = System.currentTimeMillis()
+ sql "insert into ${table1} values(2,2,2);"
+ time_diff = System.currentTimeMillis() - now
+ logger.info("time_diff:" + time_diff)
+ assertTrue(time_diff > timeout, "insert or delete should take over " + timeout + " ms")
+ order_qt_sql "select * from ${table1};"
+
+ now = System.currentTimeMillis()
+ sql "delete from ${table1} where k1=2;"
+ time_diff = System.currentTimeMillis() - now
+ logger.info("time_diff:" + time_diff)
+ assertTrue(time_diff > timeout, "insert or delete should take over " + timeout + " ms")
+ order_qt_sql "select * from ${table1};"
+ }
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().disableDebugPointForAllFEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+ sql "DROP TABLE IF EXISTS ${table1};"
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+
+ }
+
+}
\ No newline at end of file