diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index dedb66d5f9458b..b48c05d74344f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -58,6 +58,7 @@ public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map partitionId kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getTimeoutBackOffCount(), kafkaTaskInfo.getBeId(), isMultiTable); this.partitionIdToOffset = partitionIdToOffset; + this.isEof = kafkaTaskInfo.getIsEof(); } public List getPartitions() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 5ca291aa8d7fdf..ea9b4c29d0650b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1333,7 +1333,7 @@ private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskIn } else if (checkCommitInfo(rlTaskTxnCommitAttachment, txnState, txnStatusChangeReason)) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); - routineLoadTaskInfo.selfAdaptTimeout(rlTaskTxnCommitAttachment); + routineLoadTaskInfo.handleTaskByTxnCommitAttachment(rlTaskTxnCommitAttachment); } if (rlTaskTxnCommitAttachment != null && !Strings.isNullOrEmpty(rlTaskTxnCommitAttachment.getErrorLogUrl())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index d101d98cf850a6..301efe4d9c9604 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -76,6 +76,8 @@ public abstract class RoutineLoadTaskInfo { protected static final int MAX_TIMEOUT_BACK_OFF_COUNT = 3; protected int timeoutBackOffCount = 0; + protected boolean isEof = false; + // this status will be set when corresponding transaction's status is changed. // so that user or other logic can know the status of the corresponding txn. protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; @@ -160,6 +162,10 @@ public int getTimeoutBackOffCount() { return timeoutBackOffCount; } + public boolean getIsEof() { + return isEof; + } + public boolean isTimeout() { if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) { // the corresponding txn is already finished, this task can not be treated as timeout. @@ -174,7 +180,12 @@ public boolean isTimeout() { return false; } - public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + public void handleTaskByTxnCommitAttachment(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + selfAdaptTimeout(rlTaskTxnCommitAttachment); + judgeEof(rlTaskTxnCommitAttachment); + } + + private void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { long taskExecutionTime = rlTaskTxnCommitAttachment.getTaskExecutionTimeMs(); long timeoutMs = this.timeoutMs; @@ -189,6 +200,15 @@ public void selfAdaptTimeout(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment this.timeoutMs = timeoutMs; } + private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); + if (rlTaskTxnCommitAttachment.getTotalRows() < routineLoadJob.getMaxBatchRows() + && rlTaskTxnCommitAttachment.getReceivedBytes() < routineLoadJob.getMaxBatchSizeBytes() + && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() < this.timeoutMs) { + this.isEof = true; + } + } + abstract TRoutineLoadTask createRoutineLoadTask() throws UserException; // begin the txn of this task diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index d4d5d5512d31e3..8afc35411b55f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -101,6 +101,16 @@ private void process() throws UserException, InterruptedException { try { // This step will be blocked when queue is empty RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.take(); + // try to delay scheduling tasks that are perceived as Eof to MaxBatchInterval + // to avoid to much small transaction + if (routineLoadTaskInfo.getIsEof()) { + RoutineLoadJob routineLoadJob = routineLoadManager.getJob(routineLoadTaskInfo.getJobId()); + if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime() + < routineLoadJob.getMaxBatchIntervalS()) { + needScheduleTasksQueue.addLast(routineLoadTaskInfo); + return; + } + } scheduleOneTask(routineLoadTaskInfo); } catch (Exception e) { LOG.warn("Taking routine load task from queue has been interrupted", e); @@ -108,6 +118,7 @@ private void process() throws UserException, InterruptedException { } private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exception { + routineLoadTaskInfo.setLastScheduledTime(System.currentTimeMillis()); if (LOG.isDebugEnabled()) { LOG.debug("schedule routine load task info {} for job {}", routineLoadTaskInfo.id, routineLoadTaskInfo.getJobId());