diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index 5b7d7f57282..3e576e3d668 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -18,7 +18,8 @@ seatunnel: engine: backup-count: 1 - print-execution-info-interval: 10 + print-execution-info-interval: 60 + print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true checkpoint: diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java new file mode 100644 index 00000000000..cad9d667c1b --- /dev/null +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.common.utils; + +import java.util.Collections; + +public class StringFormatUtils { + private static final int NUM = 47; + + private StringFormatUtils() { + // utility class can not be instantiated + } + + public static String formatTable(Object... objects) { + String title = objects[0].toString(); + int blankNum = (NUM - title.length()) / 2; + int kvNum = (objects.length - 1) / 2; + String template = "\n" + "***********************************************" + + "\n" + String.join("", Collections.nCopies(blankNum, " ")) + "%s" + + "\n" + "***********************************************" + + "\n" + String.join("", Collections.nCopies(kvNum, "%-26s: %19s\n")) + + "***********************************************\n"; + return String.format(template, objects); + } +} diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java new file mode 100644 index 00000000000..ce1632ac5f4 --- /dev/null +++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.common.utils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class StringFormatUtilsTest { + @Test + public void testStringFormat() { + String s = StringFormatUtils.formatTable("Job Statistic Information", + "Start Time", + "2023-01-11 00:00:00", + + "End Time", + "2023-01-11 00:00:00", + + "Total Time(s)", + 0, + + "Total Read Count", + 0, + + "Total Write Count", + 0, + + "Total Failed Count", + 0); + Assertions.assertEquals(s, "\n" + + "***********************************************\n" + + " Job Statistic Information\n" + + "***********************************************\n" + + "Start Time : 2023-01-11 00:00:00\n" + + "End Time : 2023-01-11 00:00:00\n" + + "Total Time(s) : 0\n" + + "Total Read Count : 0\n" + + "Total Write Count : 0\n" + + "Total Failed Count : 0\n" + + "***********************************************\n"); + } +} diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index fd52bc2d0cf..0fbd3ee5e2d 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -19,6 +19,8 @@ import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist; +import org.apache.seatunnel.common.utils.DateTimeUtils; +import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.core.starter.command.Command; import org.apache.seatunnel.core.starter.enums.MasterType; import org.apache.seatunnel.core.starter.exception.CommandExecuteException; @@ -27,6 +29,7 @@ import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.JobMetricsRunner; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; @@ -38,8 +41,13 @@ import lombok.extern.slf4j.Slf4j; import java.nio.file.Path; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.Random; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * This command is used to execute the SeaTunnel engine job by SeaTunnel API. @@ -53,11 +61,15 @@ public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) { this.clientCommandArgs = clientCommandArgs; } - @SuppressWarnings("checkstyle:RegexpSingleline") + @SuppressWarnings({"checkstyle:RegexpSingleline", "checkstyle:MagicNumber"}) @Override public void execute() throws CommandExecuteException { HazelcastInstance instance = null; SeaTunnelClient engineClient = null; + ScheduledExecutorService executorService = null; + JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null; + LocalDateTime startTime = LocalDateTime.now(); + LocalDateTime endTime = LocalDateTime.now(); SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); try { String clusterName = clientCommandArgs.getClusterName(); @@ -87,11 +99,22 @@ public void execute() throws CommandExecuteException { jobConfig.setName(clientCommandArgs.getJobName()); JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig); - + // get job start time + startTime = LocalDateTime.now(); + // create job proxy ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); - clientJobProxy.waitForJobComplete(); + // get job id long jobId = clientJobProxy.getJobId(); - System.out.println(engineClient.getJobMetrics(jobId)); + JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId); + executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.scheduleAtFixedRate(jobMetricsRunner, 0, + seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS); + // wait for job complete + clientJobProxy.waitForJobComplete(); + // get job end time + endTime = LocalDateTime.now(); + // get job statistic information when job finished + jobMetricsSummary = engineClient.getJobMetricsSummary(jobId); } } catch (ExecutionException | InterruptedException e) { throw new CommandExecuteException("SeaTunnel job executed failed", e); @@ -102,6 +125,31 @@ public void execute() throws CommandExecuteException { if (instance != null) { instance.shutdown(); } + if (executorService != null) { + executorService.shutdown(); + } + if (jobMetricsSummary != null) { + // print job statistics information when job finished + log.info(StringFormatUtils.formatTable( + "Job Statistic Information", + "Start Time", + DateTimeUtils.toString(startTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), + + "End Time", + DateTimeUtils.toString(endTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), + + "Total Time(s)", + Duration.between(startTime, endTime).getSeconds(), + + "Total Read Count", + jobMetricsSummary.getSourceReadCount(), + + "Total Write Count", + jobMetricsSummary.getSinkWriteCount(), + + "Total Failed Count", + jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount())); + } } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java index 691f25c6f1b..7ed4ab94903 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.engine.client.job.JobClient; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobDAGInfo; @@ -31,11 +32,15 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.hazelcast.client.config.ClientConfig; import com.hazelcast.logging.ILogger; import lombok.NonNull; public class SeaTunnelClient implements SeaTunnelClientInstance { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final SeaTunnelHazelcastClient hazelcastClient; public SeaTunnelClient(@NonNull ClientConfig clientConfig) { @@ -127,4 +132,25 @@ public JobDAGInfo getJobInfo(Long jobId) { SeaTunnelGetJobInfoCodec::decodeResponse )); } + + public JobMetricsSummary getJobMetricsSummary(Long jobId) { + long sourceReadCount = 0L; + long sinkWriteCount = 0L; + String jobMetrics = getJobMetrics(jobId); + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics); + JsonNode sourceReaders = jsonNode.get("SourceReceivedCount"); + JsonNode sinkWriters = jsonNode.get("SinkWriteCount"); + for (int i = 0; i < sourceReaders.size(); i++) { + JsonNode sourceReader = sourceReaders.get(i); + JsonNode sinkWriter = sinkWriters.get(i); + sourceReadCount += sourceReader.get("value").asLong(); + sinkWriteCount += sinkWriter.get("value").asLong(); + } + return new JobMetricsSummary(sourceReadCount, sinkWriteCount); + // Add NullPointerException because of metrics information can be empty like {} + } catch (JsonProcessingException | NullPointerException e) { + return new JobMetricsSummary(sourceReadCount, sinkWriteCount); + } + } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java new file mode 100644 index 00000000000..fde09533485 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.client.job; + +import org.apache.seatunnel.common.utils.DateTimeUtils; +import org.apache.seatunnel.common.utils.StringFormatUtils; +import org.apache.seatunnel.engine.client.SeaTunnelClient; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; +import java.time.LocalDateTime; + +@Slf4j +public class JobMetricsRunner implements Runnable { + private final SeaTunnelClient seaTunnelClient; + private final Long jobId; + private LocalDateTime lastRunTime = LocalDateTime.now(); + private Long lastReadCount = 0L; + private Long lastWriteCount = 0L; + + public JobMetricsRunner(SeaTunnelClient seaTunnelClient, Long jobId) { + this.seaTunnelClient = seaTunnelClient; + this.jobId = jobId; + } + + @Override + public void run() { + Thread.currentThread().setName("job-metrics-runner-" + jobId); + try { + JobMetricsSummary jobMetricsSummary = seaTunnelClient.getJobMetricsSummary(jobId); + LocalDateTime now = LocalDateTime.now(); + long seconds = Duration.between(lastRunTime, now).getSeconds(); + long averageRead = (jobMetricsSummary.getSourceReadCount() - lastReadCount) / seconds; + long averageWrite = (jobMetricsSummary.getSinkWriteCount() - lastWriteCount) / seconds; + log.info(StringFormatUtils.formatTable( + "Job Progress Information", + "Read Count So Far", + jobMetricsSummary.getSourceReadCount(), + + "Write Count So Far", + jobMetricsSummary.getSinkWriteCount(), + + "Average Read Count", + averageRead + "/s", + + "Average Write Count", + averageWrite + "/s", + + "Last Statistic Time", + DateTimeUtils.toString(lastRunTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), + + "Current Statistic Time", + DateTimeUtils.toString(now, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))); + lastRunTime = now; + lastReadCount = jobMetricsSummary.getSourceReadCount(); + lastWriteCount = jobMetricsSummary.getSinkWriteCount(); + } catch (Exception e) { + log.warn("Failed to get job metrics summary, it maybe first-run"); + } + } + + @Data + @AllArgsConstructor + public static class JobMetricsSummary { + private long sourceReadCount; + private long sinkWriteCount; + } +} diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index 365e4e8873a..0037d0df817 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -32,6 +32,8 @@ public class EngineConfig { private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue(); private int printExecutionInfoInterval = ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue(); + private int printJobMetricsInfoInterval = ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.defaultValue(); + private SlotServiceConfig slotServiceConfig = ServerConfigOptions.SLOT_SERVICE.defaultValue(); private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue(); @@ -46,4 +48,8 @@ public void setPrintExecutionInfoInterval(int printExecutionInfoInterval) { this.printExecutionInfoInterval = printExecutionInfoInterval; } + public void setPrintJobMetricsInfoInterval(int printJobMetricsInfoInterval) { + checkPositive(printExecutionInfoInterval, ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0"); + this.printJobMetricsInfoInterval = printJobMetricsInfoInterval; + } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 22e137b0969..229d40f1cc6 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -98,6 +98,9 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { } else if (ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) { engineConfig.setPrintExecutionInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(), getTextContent(node))); + } else if (ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key().equals(name)) { + engineConfig.setPrintJobMetricsInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key(), + getTextContent(node))); } else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) { engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node)); } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) { diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 5dc5e0080e3..6b782bc8b5b 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -31,6 +31,8 @@ public class ServerConfigOptions { public static final Option PRINT_EXECUTION_INFO_INTERVAL = Options.key("print-execution-info-interval").intType().defaultValue(60).withDescription("The interval (in seconds) between two consecutive executions of the print execution info task."); + public static final Option PRINT_JOB_METRICS_INFO_INTERVAL = Options.key("print-job-metrics-info-interval").intType().defaultValue(60).withDescription("The interval (in seconds) of job print metrics info"); + public static final Option DYNAMIC_SLOT = Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether to use dynamic slot."); public static final Option SLOT_NUM = Options.key("slot-num").intType().defaultValue(2).withDescription("The number of slots. Only valid when dynamic slot is disabled."); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index d5d1fbf3cbe..dfc92ce38c0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.common.metrics.JobMetrics; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.EngineConfig; import org.apache.seatunnel.engine.common.exception.JobException; @@ -503,13 +504,7 @@ public void printExecutionInfo() { int poolSize = threadPoolExecutor.getPoolSize(); long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); long taskCount = threadPoolExecutor.getTaskCount(); - logger.info(String.format( - "\n" + "***********************************************" + - "\n" + " %s" + - "\n" + "***********************************************" + - "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" - + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" - + "***********************************************\n", + logger.info(StringFormatUtils.formatTable( "CoordinatorService Thread Pool Status", "activeCount", activeCount,