diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java index d6b0bdb96cbe..4acd0924788e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/HybridSplitAssigner.java @@ -114,7 +114,8 @@ public Optional getNext() { @Override public boolean waitingForCompletedSplits() { - return snapshotSplitAssigner.waitingForCompletedSplits(); + return snapshotSplitAssigner.waitingForCompletedSplits() + || incrementalSplitAssigner.waitingForAssignedSplits(); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java index b17b910e5d89..87ca7dc507ae 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSourceEnumerator.java @@ -177,8 +177,16 @@ private void assignSplits() { awaitingReader.remove(); LOG.debug("Assign split {} to subtask {}", sourceSplit, nextAwaiting); } else { - // there is no available splits by now, skip assigning - break; + if (splitAssigner.waitingForCompletedSplits()) { + // there is no available splits by now, skip assigning + break; + } else { + LOG.info( + "No more splits available, signal no more splits to subtask {}", + nextAwaiting); + context.signalNoMoreSplits(nextAwaiting); + awaitingReader.remove(); + } } } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java index 7b45ee1ef619..1accc47af232 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java @@ -278,4 +278,8 @@ public boolean completedSnapshotPhase(List tableIds) { return context.getAssignedSnapshotSplit().isEmpty() && context.getSplitCompletedOffsets().isEmpty(); } + + public boolean waitingForAssignedSplits() { + return !(splitAssigned && noMoreSplits()); + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java index 829f68763da4..1370278a609e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java @@ -108,7 +108,13 @@ public void pollNext(Collector output) throws Exception { context.sendSplitRequest(); needSendSplitRequest.compareAndSet(true, false); } - super.pollNext(output); + + if (isNoMoreSplitsAssignment() && isNoMoreRecords()) { + log.info("Reader {} send NoMoreElement event", context.getIndexOfSubtask()); + context.signalNoMoreElement(); + } else { + super.pollNext(output); + } } @Override diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java index 29dd2ff6f5e4..9a382cf0e8e9 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java @@ -65,7 +65,7 @@ public abstract class SourceReaderBase currentFetch; protected SplitContext currentSplitContext; private Collector currentSplitOutput; - private boolean noMoreSplitsAssignment; + @Getter private volatile boolean noMoreSplitsAssignment; public SourceReaderBase( BlockingQueue> elementsQueue, @@ -94,10 +94,11 @@ public void pollNext(Collector output) throws Exception { if (recordsWithSplitId == null) { if (Boundedness.BOUNDED.equals(context.getBoundedness()) && noMoreSplitsAssignment - && splitFetcherManager.maybeShutdownFinishedFetchers() - && elementsQueue.isEmpty()) { + && isNoMoreRecords()) { context.signalNoMoreElement(); - log.info("Send NoMoreElement event"); + log.info( + "Reader {} into idle state, send NoMoreElement event", + context.getIndexOfSubtask()); } return; } @@ -137,7 +138,7 @@ public void addSplits(List splits) { @Override public void handleNoMoreSplits() { - log.info("Reader received NoMoreSplits event."); + log.info("Reader {} received NoMoreSplits event.", context.getIndexOfSubtask()); noMoreSplitsAssignment = true; } @@ -146,9 +147,15 @@ public void handleSourceEvent(SourceEvent sourceEvent) { log.info("Received unhandled source event: {}", sourceEvent); } + protected boolean isNoMoreRecords() { + return splitFetcherManager.maybeShutdownFinishedFetchers() + && elementsQueue.isEmpty() + && currentFetch == null; + } + @Override public void close() { - log.info("Closing Source Reader."); + log.info("Closing Source Reader {}.", context.getIndexOfSubtask()); try { splitFetcherManager.close(options.getSourceReaderCloseTimeout()); } catch (Exception e) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java index 7179cc8cb354..ce3cc56bd17c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrier.java @@ -18,27 +18,46 @@ package org.apache.seatunnel.engine.server.checkpoint; import org.apache.seatunnel.engine.core.checkpoint.CheckpointType; +import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.task.record.Barrier; import com.google.common.base.Objects; +import lombok.Getter; import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import static com.google.common.base.Preconditions.checkNotNull; +@Getter public class CheckpointBarrier implements Barrier, Serializable { private final long id; private final long timestamp; private final CheckpointType checkpointType; + private final Set prepareCloseTasks; + private final Set closedTasks; public CheckpointBarrier(long id, long timestamp, CheckpointType checkpointType) { + this(id, timestamp, checkpointType, Collections.emptySet(), Collections.emptySet()); + } + + public CheckpointBarrier( + long id, + long timestamp, + CheckpointType checkpointType, + Set prepareCloseTasks, + Set closedTasks) { this.id = id; this.timestamp = timestamp; this.checkpointType = checkNotNull(checkpointType); - } - - public long getId() { - return id; + this.prepareCloseTasks = prepareCloseTasks; + this.closedTasks = closedTasks; + if (new HashSet(prepareCloseTasks).removeAll(closedTasks)) { + throw new IllegalArgumentException( + "The prepareCloseTasks collection should not contain elements of the closedTasks collection"); + } } @Override @@ -51,12 +70,17 @@ public boolean prepareClose() { return checkpointType.isFinalCheckpoint(); } - public long getTimestamp() { - return timestamp; + @Override + public boolean prepareClose(TaskLocation task) { + if (prepareClose()) { + return true; + } + return prepareCloseTasks.contains(task); } - public CheckpointType getCheckpointType() { - return checkpointType; + @Override + public Set closedTasks() { + return Collections.unmodifiableSet(closedTasks); } @Override @@ -81,7 +105,8 @@ public boolean equals(Object other) { @Override public String toString() { return String.format( - "CheckpointBarrier %d @ %d Options: %s", id, timestamp, checkpointType); + "CheckpointBarrier %d @ %d type: %s, prepareCloseTasks: %s, closedTasks: %s", + id, timestamp, checkpointType, prepareCloseTasks, closedTasks); } public boolean isAuto() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 313e4e9dd664..a1eec6dee7f6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -55,6 +55,7 @@ import java.time.Instant; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -111,6 +112,8 @@ public class CheckpointCoordinator { private final CheckpointPlan plan; private final Set readyToCloseStartingTask; + private final Set readyToCloseIdleTask; + private final Set closedIdleTask; private final ConcurrentHashMap pendingCheckpoints; private final ArrayDeque completedCheckpointIds; @@ -189,6 +192,8 @@ public CheckpointCoordinator( this.pipelineTaskStatus = new ConcurrentHashMap<>(); this.checkpointIdCounter = checkpointIdCounter; this.readyToCloseStartingTask = new CopyOnWriteArraySet<>(); + this.readyToCloseIdleTask = new CopyOnWriteArraySet<>(); + this.closedIdleTask = new CopyOnWriteArraySet<>(); LOG.info( "Create CheckpointCoordinator for job({}@{}) with plan({})", @@ -309,7 +314,11 @@ private void restoreTaskState(TaskLocation taskLocation) { for (int i = tuple.f1(); i < actionState.getParallelism(); i += currentParallelism) { - states.add(actionState.getSubtaskStates().get(i)); + ActionSubtaskState subtaskState = + actionState.getSubtaskStates().get(i); + if (subtaskState != null) { + states.add(subtaskState); + } } }); } @@ -397,6 +406,46 @@ protected void readyToClose(TaskLocation taskLocation) { } } + protected void readyToCloseIdleTask(TaskLocation taskLocation) { + if (plan.getStartingSubtasks().contains(taskLocation)) { + throw new UnsupportedOperationException("Unsupported close starting task"); + } + + LOG.info("Received close idle task: {}", taskLocation); + synchronized (readyToCloseIdleTask) { + if (readyToCloseIdleTask.contains(taskLocation) + || closedIdleTask.contains(taskLocation)) { + LOG.warn("Task {} already in readyToCloseIdleTask or closedIdleTask", taskLocation); + return; + } + + List subTaskList = new ArrayList<>(); + for (TaskLocation subTask : plan.getPipelineSubtasks()) { + if (subTask.getTaskGroupLocation().equals(taskLocation.getTaskGroupLocation())) { + // close all subtask in the same task group + subTaskList.add(subTask); + LOG.info("Add task {} to prepare close list", subTask.getTaskID()); + } + } + if (subTaskList.size() != 2) { + throw new UnsupportedOperationException( + "Unsupported close not reader/writer task group: " + subTaskList); + } + readyToCloseIdleTask.addAll(subTaskList); + tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE); + } + } + + protected void completedCloseIdleTask(TaskLocation taskLocation) { + synchronized (readyToCloseIdleTask) { + if (readyToCloseIdleTask.contains(taskLocation)) { + readyToCloseIdleTask.remove(taskLocation); + closedIdleTask.add(taskLocation); + LOG.info("Completed close idle task {}", taskLocation.getTaskID()); + } + } + } + protected void restoreCoordinator(boolean alreadyStarted) { LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted); errorByPhysicalVertex = new AtomicReference<>(); @@ -553,7 +602,9 @@ private void startTriggerPendingCheckpoint( pendingCheckpoint.getCheckpointId(), pendingCheckpoint .getCheckpointTimestamp(), - pendingCheckpoint.getCheckpointType()), + pendingCheckpoint.getCheckpointType(), + new HashSet<>(readyToCloseIdleTask), + new HashSet<>(closedIdleTask)), executorService) .thenApplyAsync(this::triggerCheckpoint, executorService); @@ -666,6 +717,7 @@ private CompletableFuture triggerPendingCheckpoint( private Set getNotYetAcknowledgedTasks() { // TODO: some tasks have completed and don't need to be ack return plan.getPipelineSubtasks().stream() + .filter(e -> !closedIdleTask.contains(e)) .map(TaskLocation::getTaskID) .collect(Collectors.toCollection(CopyOnWriteArraySet::new)); } @@ -715,6 +767,8 @@ protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) { } pipelineTaskStatus.clear(); readyToCloseStartingTask.clear(); + readyToCloseIdleTask.clear(); + closedIdleTask.clear(); pendingCounter.set(0); schemaChanging.set(false); scheduler.shutdownNow(); @@ -752,6 +806,11 @@ protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) { pendingCheckpoint.getCheckpointType().isSavepoint() ? SubtaskStatus.SAVEPOINT_PREPARE_CLOSE : SubtaskStatus.RUNNING); + + if (ackOperation.getBarrier().getCheckpointType().isGeneralCheckpoint() + && ackOperation.getBarrier().prepareClose(location)) { + completedCloseIdleTask(location); + } } public synchronized void completePendingCheckpoint(CompletedCheckpoint completedCheckpoint) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index 9ef2e6623be9..21c2b90df570 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -216,6 +216,15 @@ public void readyToClose(TaskLocation taskLocation) { getCheckpointCoordinator(taskLocation).readyToClose(taskLocation); } + /** + * Called by the {@link SourceSplitEnumeratorTask}.
+ * used by SourceSplitEnumeratorTask to tell CheckpointCoordinator pipeline will trigger close + * barrier of idle task by SourceSplitEnumeratorTask. + */ + public void readyToCloseIdleTask(TaskLocation taskLocation) { + getCheckpointCoordinator(taskLocation).readyToCloseIdleTask(taskLocation); + } + /** * Called by the JobMaster.
* Listen to the {@link PipelineStatus} of the {@link Pipeline}, which is used to shut down the diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 6b90fce8e290..cf6aaa895cb2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -389,6 +389,46 @@ public JobDAGInfo getJobDAGInfo() { return jobDAGInfo; } + public void releaseTaskGroupResource( + PipelineLocation pipelineLocation, TaskGroupLocation taskGroupLocation) { + Map taskGroupLocationSlotProfileMap = + ownedSlotProfilesIMap.get(pipelineLocation); + if (taskGroupLocationSlotProfileMap == null) { + return; + } + SlotProfile taskGroupSlotProfile = taskGroupLocationSlotProfileMap.get(taskGroupLocation); + if (taskGroupSlotProfile == null) { + return; + } + + try { + RetryUtils.retryWithException( + () -> { + LOGGER.info( + String.format( + "release the task group resource %s", taskGroupLocation)); + + resourceManager + .releaseResources( + jobImmutableInformation.getJobId(), + Collections.singletonList(taskGroupSlotProfile)) + .join(); + + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + } catch (Exception e) { + LOGGER.warning( + String.format( + "release the task group resource failed %s, with exception: %s ", + taskGroupLocation, ExceptionUtils.getMessage(e))); + } + } + public void releasePipelineResource(SubPlan subPlan) { try { Map taskGroupLocationSlotProfileMap = @@ -663,6 +703,13 @@ public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { task.updateStateByExecutionService( taskExecutionState); + if (taskExecutionState + .getExecutionState() + .isEndState()) { + releaseTaskGroupResource( + pipeline.getPipelineLocation(), + task.getTaskGroupLocation()); + } }); }); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java index 90caab3c188d..975810693d9f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java @@ -45,6 +45,8 @@ public void write(ObjectDataOutput out, Record record) throws IOException { out.writeLong(checkpointBarrier.getId()); out.writeLong(checkpointBarrier.getTimestamp()); out.writeString(checkpointBarrier.getCheckpointType().getName()); + out.writeObject(checkpointBarrier.getPrepareCloseTasks()); + out.writeObject(checkpointBarrier.getClosedTasks()); } else if (data instanceof SeaTunnelRow) { SeaTunnelRow row = (SeaTunnelRow) data; out.writeByte(RecordDataType.SEATUNNEL_ROW.ordinal()); @@ -67,7 +69,11 @@ public Record read(ObjectDataInput in) throws IOException { if (dataType == RecordDataType.CHECKPOINT_BARRIER.ordinal()) { data = new CheckpointBarrier( - in.readLong(), in.readLong(), CheckpointType.fromName(in.readString())); + in.readLong(), + in.readLong(), + CheckpointType.fromName(in.readString()), + in.readObject(), + in.readObject()); } else if (dataType == RecordDataType.SEATUNNEL_ROW.ordinal()) { String tableId = in.readString(); byte rowKind = in.readByte(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java index 7c298272c7d2..a9267675761e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java @@ -37,6 +37,7 @@ import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation; import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation; import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation; +import org.apache.seatunnel.engine.server.task.operation.source.CloseIdleReaderOperation; import org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation; import org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation; import org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation; @@ -101,6 +102,8 @@ public class TaskDataSerializerHook implements DataSerializerHook { public static final int REPORT_JOB_EVENT = 25; + public static final int CLOSE_READER_OPERATION = 26; + public static final int FACTORY_ID = FactoryIdHelper.getFactoryId( SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY, @@ -171,6 +174,8 @@ public IdentifiedDataSerializable create(int typeId) { return new DeleteConnectorJarInExecutionNode(); case REPORT_JOB_EVENT: return new JobEventReportOperation(); + case CLOSE_READER_OPERATION: + return new CloseIdleReaderOperation(); default: throw new IllegalArgumentException("Unknown type id " + typeId); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java index 3ee636bbc581..d66921c74239 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java @@ -339,7 +339,8 @@ public void ack(Barrier barrier) { Integer ackSize = cycleAcks.compute(barrier.getId(), (id, count) -> count == null ? 1 : ++count); if (ackSize == allCycles.size()) { - if (barrier.prepareClose()) { + cycleAcks.remove(barrier.getId()); + if (barrier.prepareClose(this.taskLocation)) { this.prepareCloseStatus = true; this.prepareCloseBarrierId.set(barrier.getId()); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java index 48c3ca197d2f..34a14cf5850d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java @@ -216,6 +216,12 @@ public void close() throws IOException { } } + private long getClosedWriters(Barrier barrier) { + return barrier.closedTasks().stream() + .filter(task -> writerAddressMap.containsKey(task.getTaskID())) + .count(); + } + @Override public void triggerBarrier(Barrier barrier) throws Exception { long startTime = System.currentTimeMillis(); @@ -224,10 +230,11 @@ public void triggerBarrier(Barrier barrier) throws Exception { Integer count = checkpointBarrierCounter.compute( barrier.getId(), (id, num) -> num == null ? 1 : ++num); - if (count != maxWriterSize) { + + if (count != (maxWriterSize - getClosedWriters(barrier))) { return; } - if (barrier.prepareClose()) { + if (barrier.prepareClose(this.taskLocation)) { this.prepareCloseStatus = true; this.prepareCloseBarrierId.set(barrier.getId()); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index e95684c1c517..cae3532b058e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.engine.server.task; import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SourceEvent; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SourceSplitEnumerator; @@ -32,12 +33,12 @@ import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext; import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation; +import org.apache.seatunnel.engine.server.task.operation.source.CloseIdleReaderOperation; import org.apache.seatunnel.engine.server.task.operation.source.LastCheckpointNotifyOperation; import org.apache.seatunnel.engine.server.task.record.Barrier; import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState; import com.hazelcast.cluster.Address; -import com.hazelcast.spi.impl.operationservice.Operation; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -57,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; -import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; @@ -144,7 +144,7 @@ public void triggerBarrier(Barrier barrier) throws Exception { long startTime = System.currentTimeMillis(); log.debug("split enumer trigger barrier [{}]", barrier); - if (barrier.prepareClose()) { + if (barrier.prepareClose(this.taskLocation)) { this.prepareCloseTriggered = true; this.prepareCloseBarrierId.set(barrier.getId()); } @@ -158,7 +158,7 @@ public void triggerBarrier(Barrier barrier) throws Exception { serialize = enumeratorStateSerializer.serialize(snapshotState); } log.debug("source split enumerator send state [{}] to master", snapshotState); - sendToAllReader(location -> new BarrierFlowOperation(barrier, location)); + sendToActiveReader(barrier); } if (barrier.snapshot()) { this.getExecutionContext() @@ -276,10 +276,18 @@ private SourceSplitEnumerator getEnumerator() return enumerator; } - public void readerFinished(long taskID) { - unfinishedReaders.remove(taskID); + public void readerFinished(TaskLocation taskLocation) { + unfinishedReaders.remove(taskLocation.getTaskID()); if (unfinishedReaders.isEmpty()) { prepareCloseStatus = true; + } else if (Boundedness.UNBOUNDED.equals(this.source.getSource().getBoundedness())) { + log.info( + "Send close idle reader {} operation of unbounded job, taskLocation: {}", + taskLocation.getTaskIndex(), + taskLocation); + this.getExecutionContext() + .sendToMaster(new CloseIdleReaderOperation(jobID, taskLocation)) + .join(); } } @@ -348,10 +356,13 @@ public Set getRegisteredReaders() { .collect(Collectors.toSet()); } - private void sendToAllReader(Function function) { + private void sendToActiveReader(Barrier barrier) { List> futures = new ArrayList<>(); taskMemberMapping.forEach( (location, address) -> { + if (barrier.closedTasks().contains(location)) { + return; + } log.debug( "split enumerator send to read--size: {}, location: {}, address: {}", taskMemberMapping.size(), @@ -359,7 +370,8 @@ private void sendToAllReader(Function function) { address.toString()); futures.add( this.getExecutionContext() - .sendToMember(function.apply(location), address)); + .sendToMember( + new BarrierFlowOperation(barrier, location), address)); }); futures.forEach(InvocationFuture::join); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java index 32ec5cb8f4d0..9bdbd37f80ac 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java @@ -78,7 +78,7 @@ public void received(Record record) throws IOException { shuffleFlush(); Barrier barrier = (Barrier) record.getData(); - if (barrier.prepareClose()) { + if (barrier.prepareClose(runningTask.getTaskLocation())) { prepareClose = true; } if (barrier.snapshot()) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java index 2f14c6770114..87b29eab21df 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java @@ -107,7 +107,7 @@ public void collect(Collector> collector) throws Exception { // publish barrier if (alignedBarriersCounter == shuffles.length) { - if (barrier.prepareClose()) { + if (barrier.prepareClose(runningTask.getTaskLocation())) { prepareClose = true; } if (barrier.snapshot()) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 1a2143b0271a..48c530a0c36f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -183,7 +183,7 @@ public void received(Record record) { long startTime = System.currentTimeMillis(); Barrier barrier = (Barrier) record.getData(); - if (barrier.prepareClose()) { + if (barrier.prepareClose(this.taskLocation)) { prepareClose = true; } if (barrier.snapshot()) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java index 64e5bfd22b8c..83675575b4b0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java @@ -268,7 +268,7 @@ public void triggerBarrier(Barrier barrier) throws Exception { // Block the reader from adding barrier to the collector. synchronized (collector.getCheckpointLock()) { - if (barrier.prepareClose()) { + if (barrier.prepareClose(this.currentTaskLocation)) { this.prepareClose = true; } if (barrier.snapshot()) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java index 187aa3659bee..0447513b5fff 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java @@ -75,7 +75,7 @@ public void open() throws Exception { public void received(Record record) { if (record.getData() instanceof Barrier) { CheckpointBarrier barrier = (CheckpointBarrier) record.getData(); - if (barrier.prepareClose()) { + if (barrier.prepareClose(this.runningTask.getTaskLocation())) { prepareClose = true; } if (barrier.snapshot()) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java index f7cd7b0c2346..b8e53faabd46 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java @@ -64,7 +64,7 @@ private void handleRecord(Record record, ConsumerWithException> con if (record.getData() instanceof Barrier) { CheckpointBarrier barrier = (CheckpointBarrier) record.getData(); getRunningTask().ack(barrier); - if (barrier.prepareClose()) { + if (barrier.prepareClose(this.getRunningTask().getTaskLocation())) { getIntermediateQueueFlowLifeCycle().setPrepareClose(true); } consumer.accept(record); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java index d525a0b257b4..6cc5195de562 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java @@ -54,7 +54,7 @@ private void handleRecord(Record record, Collector> collector) thro if (record.getData() instanceof Barrier) { CheckpointBarrier barrier = (CheckpointBarrier) record.getData(); runningTask.ack(barrier); - if (barrier.prepareClose()) { + if (barrier.prepareClose(this.runningTask.getTaskLocation())) { this.intermediateQueueFlowLifeCycle.setPrepareClose(true); } } else { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java index 021bb8d2f083..ea47f83a7971 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java @@ -34,7 +34,8 @@ public static void onData( if (record.getData() instanceof Barrier) { CheckpointBarrier barrier = (CheckpointBarrier) record.getData(); intermediateQueueFlowLifeCycle.getRunningTask().ack(barrier); - if (barrier.prepareClose()) { + if (barrier.prepareClose( + intermediateQueueFlowLifeCycle.getRunningTask().getTaskLocation())) { intermediateQueueFlowLifeCycle.setPrepareClose(true); } } else { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseIdleReaderOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseIdleReaderOperation.java new file mode 100644 index 000000000000..abedf1a4993d --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseIdleReaderOperation.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.operation.source; + +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import com.hazelcast.spi.impl.operationservice.Operation; + +import java.io.IOException; + +public class CloseIdleReaderOperation extends Operation implements IdentifiedDataSerializable { + private long jobId; + private TaskLocation taskLocation; + + public CloseIdleReaderOperation() {} + + public CloseIdleReaderOperation(long jobId, TaskLocation taskLocation) { + this.jobId = jobId; + this.taskLocation = taskLocation; + } + + @Override + public void run() throws Exception { + SeaTunnelServer server = getService(); + server.getCoordinatorService() + .getJobMaster(jobId) + .getCheckpointManager() + .readyToCloseIdleTask(taskLocation); + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + out.writeLong(jobId); + out.writeObject(taskLocation); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + jobId = in.readLong(); + taskLocation = in.readObject(); + } + + @Override + public int getFactoryId() { + return TaskDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return TaskDataSerializerHook.CLOSE_READER_OPERATION; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java index fec6afcb91e6..60b67c0c83bf 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java @@ -57,7 +57,7 @@ public void run() throws Exception { Thread.currentThread().setContextClassLoader(classLoader); SourceSplitEnumeratorTask task = server.getTaskExecutionService().getTask(enumeratorTaskID); - task.readerFinished(currentTaskID.getTaskID()); + task.readerFinished(currentTaskID); Thread.currentThread().setContextClassLoader(oldClassLoader); return null; }, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java index 67042247b62c..4350c6337277 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/record/Barrier.java @@ -17,6 +17,10 @@ package org.apache.seatunnel.engine.server.task.record; +import org.apache.seatunnel.engine.server.execution.TaskLocation; + +import java.util.Set; + /** barrier flowing in data flow */ public interface Barrier { Long PREPARE_CLOSE_BARRIER_ID = Long.MAX_VALUE; @@ -32,4 +36,21 @@ public interface Barrier { /** Barrier indicating that the task should prepare to close. */ boolean prepareClose(); + + /** + * Barrier indicating that the task should prepare to close. + * + * @param task task location + * @return If the task is included, the return true + */ + default boolean prepareClose(TaskLocation task) { + return prepareClose(); + } + + /** + * Indicates a list of tasks that have been closed. + * + * @return + */ + Set closedTasks(); }