Skip to content

Commit

Permalink
[feature](selectdb-cloud) Copy into support force (apache#1081)
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Nov 16, 2022
1 parent 7c4e080 commit 658af85
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class CopyIntoProperties extends CopyProperties {
.build();

private static final ImmutableSet<String> COPY_PROPERTIES = new ImmutableSet.Builder<String>()
.add(TYPE).add(COMPRESSION).add(COLUMN_SEPARATOR).add(SIZE_LIMIT).add(ON_ERROR).add(ASYNC)
.add(TYPE).add(COMPRESSION).add(COLUMN_SEPARATOR).add(SIZE_LIMIT).add(ON_ERROR).add(ASYNC).add(FORCE)
.addAll(DATA_DESC_PROPERTIES).addAll(EXEC_PROPERTIES).build();

public CopyIntoProperties(Map<String, String> properties) {
Expand All @@ -55,6 +55,7 @@ public void analyze() throws AnalysisException {
analyzeAsync();
analyzeStrictMode();
analyzeLoadParallelism();
analyzeForce();
for (Entry<String, String> entry : properties.entrySet()) {
if (!COPY_PROPERTIES.contains(entry.getKey())) {
throw new AnalysisException("Property '" + entry.getKey() + "' is invalid");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class CopyProperties {
public static final String ON_ERROR_MAX_FILTER_RATIO = LoadStmt.MAX_FILTER_RATIO_PROPERTY + "_";
public static final String STRICT_MODE = COPY_PREFIX + LoadStmt.STRICT_MODE;
public static final String LOAD_PARALLELISM = COPY_PREFIX + LoadStmt.LOAD_PARALLELISM;
// If 'copy.force' is true, load files to table without checking if files have been loaded, and copy job will not
// be recorded in meta service. So it may cause one file is copied to a table many times.
public static final String FORCE = COPY_PREFIX + "force";

public CopyProperties(Map<String, String> properties, String prefix) {
this.properties = properties;
Expand Down Expand Up @@ -118,6 +121,10 @@ protected void analyzeStrictMode() throws AnalysisException {
analyzeBooleanProperty(STRICT_MODE);
}

protected void analyzeForce() throws AnalysisException {
analyzeBooleanProperty(FORCE);
}

private void analyzeBooleanProperty(String keyWithoutPrefix) throws AnalysisException {
String key = addKeyPrefix(keyWithoutPrefix);
if (properties.containsKey(key)) {
Expand Down Expand Up @@ -173,6 +180,11 @@ public boolean isAsync() {
return properties.containsKey(key) ? Boolean.parseBoolean(properties.get(key)) : true;
}

public boolean isForce() {
String key = addKeyPrefix(FORCE);
return properties.containsKey(key) ? Boolean.parseBoolean(properties.get(key)) : false;
}

public Map<String, String> getProperties() {
return properties;
}
Expand Down
19 changes: 12 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ public class CopyStmt extends DdlStmt {
@Getter
private ObjectInfo objectInfo;

@Getter
private long sizeLimit;
@Getter
private boolean async;

/**
* Use for cup.
*/
Expand Down Expand Up @@ -138,7 +133,6 @@ public void analyze(Analyzer analyzer) throws UserException {
analyzeStagePB(stagePB);

// generate broker desc
sizeLimit = copyIntoProperties.getSizeLimit();
brokerDesc = new BrokerDesc("S3", StorageBackend.StorageType.S3, brokerProperties);
// generate data description
String filePath = "s3://" + brokerProperties.get(S3_BUCKET) + "/" + brokerProperties.get(S3_PREFIX);
Expand Down Expand Up @@ -196,7 +190,6 @@ private void analyzeStagePB(StagePB stagePB) throws AnalysisException {
StageProperties stageProperties = new StageProperties(stagePB.getPropertiesMap());
this.copyIntoProperties.mergeProperties(stageProperties);
this.copyIntoProperties.analyze();
this.async = this.copyIntoProperties.isAsync();
}

public String getDbName() {
Expand All @@ -219,6 +212,18 @@ public LabelName getLabel() {
return label;
}

public long getSizeLimit() {
return this.copyIntoProperties.getSizeLimit();
}

public boolean isAsync() {
return this.copyIntoProperties.isAsync();
}

public boolean isForce() {
return this.copyIntoProperties.isForce();
}

@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
Expand Down
14 changes: 10 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class CopyJob extends BrokerLoadJob {
private ObjectInfo objectInfo;
@Getter
private String copyId;
@Getter
private boolean forceCopy;
private String loadFilePaths = "";
private Map<String, String> properties = new HashMap<>();
private volatile boolean abortedCopy = false;
Expand All @@ -79,13 +81,14 @@ public CopyJob() {

public CopyJob(long dbId, String label, TUniqueId queryId, BrokerDesc brokerDesc, OriginStatement originStmt,
UserIdentity userInfo, String stageId, StagePB.StageType stageType, long sizeLimit, String pattern,
ObjectInfo objectInfo) throws MetaNotFoundException {
ObjectInfo objectInfo, boolean forceCopy) throws MetaNotFoundException {
super(EtlJobType.COPY, dbId, label, brokerDesc, originStmt, userInfo);
this.stageId = stageId;
this.stageType = stageType;
this.sizeLimit = sizeLimit;
this.pattern = pattern;
this.objectInfo = objectInfo;
this.forceCopy = forceCopy;
this.copyId = DebugUtil.printId(queryId);
}

Expand All @@ -106,6 +109,9 @@ protected LoadTask createPendingTask() {
@Override
protected void afterCommit() throws DdlException {
super.afterCommit();
if (forceCopy) {
return;
}
for (Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups()
.entrySet()) {
long tableId = entry.getKey().getTableId();
Expand All @@ -117,7 +123,7 @@ protected void afterCommit() throws DdlException {
@Override
public void cancelJob(FailMsg failMsg) throws DdlException {
super.cancelJob(failMsg);
if (abortedCopy) {
if (forceCopy || abortedCopy) {
return;
}
for (Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups()
Expand All @@ -132,7 +138,7 @@ public void cancelJob(FailMsg failMsg) throws DdlException {
@Override
public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) {
super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
if (abortedCopy) {
if (forceCopy || abortedCopy) {
return;
}
abortCopy();
Expand All @@ -141,7 +147,7 @@ public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean nee
@Override
protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) {
super.unprotectedExecuteCancel(failMsg, abortTxn);
if (abortedCopy) {
if (forceCopy || abortedCopy) {
return;
}
abortCopy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ protected void getAllFileStatus() throws UserException {
LOG.debug("input path = {}", path);
parseFileForCopyJob(copyJob.getStageId(), fileGroup.getTableId(), copyJob.getCopyId(),
copyJob.getPattern(), copyJob.getSizeLimit(), Config.max_file_num_per_copy_into_job,
Config.max_meta_size_per_copy_into_job, fileStatuses, copyJob.getObjectInfo());
Config.max_meta_size_per_copy_into_job, fileStatuses, copyJob.getObjectInfo(),
copyJob.isForceCopy());
}
boolean isBinaryFileFormat = fileGroup.isBinaryFileFormat();
List<Pair<TBrokerFileStatus, ObjectFilePB>> filteredFileStatuses = Lists.newArrayList();
Expand Down Expand Up @@ -161,9 +162,10 @@ private void beginCopy(BrokerPendingTaskAttachment attachment) throws UserExcept
List<ObjectFilePB> objectFiles = value.stream().flatMap(List::stream).map(l -> l.second)
.collect(Collectors.toList());
// groupId is 0 because the tableId is unique in FileGroupAggKey(copy into can't set partition now)
List<ObjectFilePB> filteredObjectFiles = Env.getCurrentInternalCatalog()
.beginCopy(copyJob.getStageId(), copyJob.getStageType(), fileGroupAggKey.getTableId(),
copyJob.getCopyId(), 0, startTime, timeoutTime, objectFiles);
List<ObjectFilePB> filteredObjectFiles = copyJob.isForceCopy() ? objectFiles
: Env.getCurrentInternalCatalog()
.beginCopy(copyJob.getStageId(), copyJob.getStageType(), fileGroupAggKey.getTableId(),
copyJob.getCopyId(), 0, startTime, timeoutTime, objectFiles);
if (filteredObjectFiles.isEmpty()) {
retryTime = 0;
LOG.warn(NO_FILES_ERROR_MSG + ", matched {} files, filtered {} files "
Expand Down Expand Up @@ -214,8 +216,9 @@ private void beginCopy(BrokerPendingTaskAttachment attachment) throws UserExcept

protected void parseFileForCopyJob(String stageId, long tableId, String copyId, String pattern, long sizeLimit,
int fileNumLimit, int fileMetaSizeLimit, List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus,
ObjectInfo objectInfo) throws UserException {
List<ObjectFilePB> copiedFiles = Env.getCurrentInternalCatalog().getCopyFiles(stageId, tableId);
ObjectInfo objectInfo, boolean forceCopy) throws UserException {
List<ObjectFilePB> copiedFiles = forceCopy ? new ArrayList<>()
: Env.getCurrentInternalCatalog().getCopyFiles(stageId, tableId);
if (LOG.isDebugEnabled()) {
LOG.debug("Get copy files for stage={}, table={}, size={}", stageId, tableId, copiedFiles.size());
for (ObjectFilePB copyFile : copiedFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public LoadJob createLoadJobFromStmt(CopyStmt stmt) throws DdlException {
}
loadJob = new CopyJob(dbId, stmt.getLabel().getLabelName(), ConnectContext.get().queryId(),
stmt.getBrokerDesc(), stmt.getOrigStmt(), stmt.getUserInfo(), stmt.getStageId(),
stmt.getStageType(), stmt.getSizeLimit(), stmt.getPattern(), stmt.getObjectInfo());
stmt.getStageType(), stmt.getSizeLimit(), stmt.getPattern(), stmt.getObjectInfo(), stmt.isForce());
loadJob.setJobProperties(stmt.getProperties());
loadJob.checkAndSetDataSourceInfo(database, stmt.getDataDescriptions());
createLoadJob(loadJob);
Expand Down
28 changes: 25 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,7 @@ private static void executeCopyStmt(Env env, CopyStmt copyStmt) throws Exception
CopyJob job = (CopyJob) env.getLoadManager().createLoadJobFromStmt(copyStmt);
if (!copyStmt.isAsync()) {
// wait for execute finished
while (!job.isCompleted()) {
Thread.sleep(5000);
}
waitJobCompleted(job);
if (job.getState() == JobState.UNKNOWN || job.getState() == JobState.CANCELLED) {
QueryState queryState = new QueryState();
FailMsg failMsg = job.getFailMsg();
Expand Down Expand Up @@ -463,4 +461,28 @@ private static void executeCopyStmt(Env env, CopyStmt copyStmt) throws Exception
queryState.setResultSet(new ShowResultSet(copyStmt.getMetaData(), result));
copyStmt.getAnalyzer().getContext().setState(queryState);
}

private static void waitJobCompleted(CopyJob job) throws InterruptedException {
// check the job is completed or not.
// sleep 10ms, 1000 times(10s)
// sleep 100ms, 1000 times(100s + 10s = 110s)
// sleep 1000ms, 1000 times(1000s + 110s = 1110s)
// sleep 5000ms...
long retry = 0;
long currentInterval = 10;
while (!job.isCompleted()) {
Thread.sleep(currentInterval);
if (retry > 3010) {
continue;
}
retry++;
if (retry > 3000) {
currentInterval = 5000;
} else if (retry > 2000) {
currentInterval = 1000;
} else if (retry > 1000) {
currentInterval = 100;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,47 +144,47 @@ public void testParseFileForCopyJob() throws Exception {
String pattern = pair.first;
List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus = new ArrayList<>();
task.parseFileForCopyJob(stageId, tableId, "q1", pattern, sizeLimit, fileNumLimit, fileMetaSizeLimit,
fileStatus, objectInfo);
fileStatus, objectInfo, false);
Assert.assertEquals(pair.second.intValue(), fileStatus.size());
}
// test loaded files is not empty
do {
String pattern = null;
List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus = new ArrayList<>();
task.parseFileForCopyJob(stageId, 200, "q1", pattern, sizeLimit, fileNumLimit, fileMetaSizeLimit,
fileStatus, objectInfo);
fileStatus, objectInfo, false);
Assert.assertEquals(9, fileStatus.size());
} while (false);
// test size limit
do {
String pattern = null;
List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus = new ArrayList<>();
task.parseFileForCopyJob(stageId, tableId, "q1", pattern, 100, fileNumLimit, fileMetaSizeLimit, fileStatus,
objectInfo);
objectInfo, false);
Assert.assertEquals(4, fileStatus.size());
} while (false);
// test file num limit
do {
String pattern = null;
List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus = new ArrayList<>();
task.parseFileForCopyJob(stageId, tableId, "q1", pattern, sizeLimit, 6, fileMetaSizeLimit, fileStatus,
objectInfo);
objectInfo, false);
Assert.assertEquals(6, fileStatus.size());
} while (false);
// test file meta size limit
do {
String pattern = null;
List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus = new ArrayList<>();
task.parseFileForCopyJob(stageId, tableId, "q1", pattern, sizeLimit, fileNumLimit, 60, fileStatus,
objectInfo);
objectInfo, false);
Assert.assertEquals(2, fileStatus.size());
} while (false);
// test size and file num limit
do {
String pattern = null;
List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus = new ArrayList<>();
task.parseFileForCopyJob(stageId, tableId, "q1", pattern, 100, fileNumLimit, fileMetaSizeLimit, fileStatus,
objectInfo);
objectInfo, false);
Assert.assertEquals(4, fileStatus.size());
} while (false);
}
Expand Down Expand Up @@ -248,7 +248,7 @@ public void testContinuationToken() throws Exception {
String pattern = pair.first;
List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus = new ArrayList<>();
task.parseFileForCopyJob(stageId, tableId, "q1", pattern, sizeLimit, fileNumLimit, fileMetaSizeLimit,
fileStatus, objectInfo);
fileStatus, objectInfo, false);
Assert.assertTrue("expected: " + pair.second * 2 + ", real: " + fileStatus.size(),
pair.second * 2 == fileStatus.size());
}
Expand Down Expand Up @@ -344,7 +344,7 @@ public void testParseFileForCopyJobV2() throws Exception {

List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus = new ArrayList<>();
task.parseFileForCopyJob(stageId, tableId, "q1", pattern, sizeLimit, fileNumLimit, fileMetaSizeLimit,
fileStatus, objectInfo);
fileStatus, objectInfo, false);
Assert.assertEquals("pattern=" + pattern, pair.second.intValue(), fileStatus.size());
}
}
Expand Down
3 changes: 3 additions & 0 deletions regression-test/data/cloud/copy_into/test_external_stage.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
-- !sql --
150000

-- !sql --
300000

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ suite("test_external_stage") {
C_MKTSEGMENT CHAR(10) NOT NULL,
C_COMMENT VARCHAR(117) NOT NULL
)
UNIQUE KEY(C_CUSTKEY)
DUPLICATE KEY(C_CUSTKEY)
DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
"""

Expand Down Expand Up @@ -45,6 +45,13 @@ suite("test_external_stage") {
assertTrue(result[0].size() == 8)
assertTrue(result[0][1].equals("CANCELLED"), "Finish copy into, state=" + result[0][1] + ", expected state=CANCELLED")
qt_sql " SELECT COUNT(*) FROM ${tableName}; "

result = sql " copy into ${tableName} from @${externalStageName}('${prefix}/customer.csv.gz') properties ('copy.async' = 'false', 'copy.force'='true'); "
logger.info("copy result: " + result)
assertTrue(result.size() == 1)
assertTrue(result[0].size() == 8)
assertTrue(result[0][1].equals("FINISHED"))
qt_sql " SELECT COUNT(*) FROM ${tableName}; "
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
Expand Down

0 comments on commit 658af85

Please sign in to comment.