From caac5ebcda4d5f55318e6dd432d401eca81ebdff Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 01:14:45 +0800 Subject: [PATCH 1/7] [Feature][Zeta] Support print job metrics information --- config/seatunnel.yaml | 3 +- .../command/ClientExecuteCommand.java | 52 ++++++++++- .../engine/client/SeaTunnelClient.java | 25 +++++ .../engine/client/job/JobMetricsRunner.java | 92 +++++++++++++++++++ .../engine/common/config/EngineConfig.java | 6 ++ .../YamlSeaTunnelDomConfigProcessor.java | 3 + .../config/server/ServerConfigOptions.java | 2 + 7 files changed, 179 insertions(+), 4 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index 5b7d7f57282..3e576e3d668 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -18,7 +18,8 @@ seatunnel: engine: backup-count: 1 - print-execution-info-interval: 10 + print-execution-info-interval: 60 + print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true checkpoint: diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index fd52bc2d0cf..23aceec77f4 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.core.starter.seatunnel.command; import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist; +import static org.apache.seatunnel.engine.client.job.JobMetricsRunner.DATETIME_FORMATTER; import org.apache.seatunnel.core.starter.command.Command; import org.apache.seatunnel.core.starter.enums.MasterType; @@ -27,6 +28,7 @@ import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.JobMetricsRunner; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; @@ -38,8 +40,13 @@ import lombok.extern.slf4j.Slf4j; import java.nio.file.Path; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.Random; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * This command is used to execute the SeaTunnel engine job by SeaTunnel API. @@ -53,11 +60,12 @@ public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) { this.clientCommandArgs = clientCommandArgs; } - @SuppressWarnings("checkstyle:RegexpSingleline") + @SuppressWarnings({"checkstyle:RegexpSingleline", "checkstyle:MagicNumber"}) @Override public void execute() throws CommandExecuteException { HazelcastInstance instance = null; SeaTunnelClient engineClient = null; + ScheduledExecutorService executorService = null; SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); try { String clusterName = clientCommandArgs.getClusterName(); @@ -89,9 +97,44 @@ public void execute() throws CommandExecuteException { engineClient.createExecutionContext(configFile.toString(), jobConfig); ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); - clientJobProxy.waitForJobComplete(); + // get job id long jobId = clientJobProxy.getJobId(); - System.out.println(engineClient.getJobMetrics(jobId)); + JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId); + executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.scheduleAtFixedRate(jobMetricsRunner, 0, + seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS); + // get job start time + LocalDateTime startTime = LocalDateTime.now(); + clientJobProxy.waitForJobComplete(); + // get job end time + LocalDateTime endTime = LocalDateTime.now(); + // print job statistic information when job finished + JobMetricsRunner.JobMetricsSummary jobMetricsSummary = engineClient.getJobMetricsSummary(jobId); + log.info(String.format( + "\n" + "***********************************************" + + "\n" + " %s" + + "\n" + "***********************************************" + + "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + + "***********************************************\n", + "Job Statistic Information", + "Start Time", + DATETIME_FORMATTER.format(startTime), + + "End Time", + DATETIME_FORMATTER.format(endTime), + + "Total Time(s)", + Duration.between(startTime, endTime).getSeconds(), + + "Total Read Count", + jobMetricsSummary.getSourceReadCount(), + + "Total Write Count", + jobMetricsSummary.getSinkWriteCount(), + + "Total Failed Count", + jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount())); } } catch (ExecutionException | InterruptedException e) { throw new CommandExecuteException("SeaTunnel job executed failed", e); @@ -102,6 +145,9 @@ public void execute() throws CommandExecuteException { if (instance != null) { instance.shutdown(); } + if (executorService != null) { + executorService.shutdown(); + } } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java index 691f25c6f1b..1f32890f91b 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.engine.client.job.JobClient; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobDAGInfo; @@ -31,11 +32,15 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.hazelcast.client.config.ClientConfig; import com.hazelcast.logging.ILogger; import lombok.NonNull; public class SeaTunnelClient implements SeaTunnelClientInstance { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final SeaTunnelHazelcastClient hazelcastClient; public SeaTunnelClient(@NonNull ClientConfig clientConfig) { @@ -127,4 +132,24 @@ public JobDAGInfo getJobInfo(Long jobId) { SeaTunnelGetJobInfoCodec::decodeResponse )); } + + public JobMetricsSummary getJobMetricsSummary(Long jobId) { + long sourceReadCount = 0L; + long sinkWriteCount = 0L; + String jobMetrics = getJobMetrics(jobId); + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics); + JsonNode sourceReaders = jsonNode.get("SourceReceivedCount"); + JsonNode sinkWriters = jsonNode.get("SinkWriteCount"); + for (int i = 0; i < sourceReaders.size(); i++) { + JsonNode sourceReader = sourceReaders.get(i); + JsonNode sinkWriter = sinkWriters.get(i); + sourceReadCount += sourceReader.get("value").asLong(); + sinkWriteCount += sinkWriter.get("value").asLong(); + } + return new JobMetricsSummary(sourceReadCount, sinkWriteCount); + } catch (JsonProcessingException e) { + return new JobMetricsSummary(sourceReadCount, sinkWriteCount); + } + } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java new file mode 100644 index 00000000000..edd2613b587 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.client.job; + +import org.apache.seatunnel.engine.client.SeaTunnelClient; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +@Slf4j +public class JobMetricsRunner extends Thread { + public static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private final SeaTunnelClient seaTunnelClient; + private final Long jobId; + private LocalDateTime lastRunTime = LocalDateTime.now(); + private Long lastReadCount = 0L; + private Long lastWriteCount = 0L; + + public JobMetricsRunner(SeaTunnelClient seaTunnelClient, Long jobId) { + this.seaTunnelClient = seaTunnelClient; + this.jobId = jobId; + } + + @Override + public void run() { + Thread.currentThread().setName("job-metrics-runner-" + jobId); + try { + JobMetricsSummary jobMetricsSummary = seaTunnelClient.getJobMetricsSummary(jobId); + LocalDateTime now = LocalDateTime.now(); + long seconds = Duration.between(lastRunTime, now).getSeconds(); + long averageRead = (jobMetricsSummary.getSourceReadCount() - lastReadCount) / seconds; + long averageWrite = (jobMetricsSummary.getSinkWriteCount() - lastWriteCount) / seconds; + log.info(String.format( + "\n" + "***********************************************" + + "\n" + " %s" + + "\n" + "***********************************************" + + "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + + "***********************************************\n", + "Job Progress Information", + "Read Count So Far", + jobMetricsSummary.getSourceReadCount(), + + "Write Count So Far", + jobMetricsSummary.getSinkWriteCount(), + + "Average Read Count", + averageRead + "/s", + + "Average Write Count", + averageWrite + "/s", + + "Last Run Time", + DATETIME_FORMATTER.format(lastRunTime), + + "Current Time", + DATETIME_FORMATTER.format(now))); + lastRunTime = now; + lastReadCount = jobMetricsSummary.getSourceReadCount(); + lastWriteCount = jobMetricsSummary.getSinkWriteCount(); + } catch (Exception e) { + log.warn("Failed to get job metrics summary, it maybe first-run"); + } + } + + @Data + @AllArgsConstructor + public static class JobMetricsSummary { + private long sourceReadCount; + private long sinkWriteCount; + } +} diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index 365e4e8873a..0037d0df817 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -32,6 +32,8 @@ public class EngineConfig { private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue(); private int printExecutionInfoInterval = ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue(); + private int printJobMetricsInfoInterval = ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.defaultValue(); + private SlotServiceConfig slotServiceConfig = ServerConfigOptions.SLOT_SERVICE.defaultValue(); private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue(); @@ -46,4 +48,8 @@ public void setPrintExecutionInfoInterval(int printExecutionInfoInterval) { this.printExecutionInfoInterval = printExecutionInfoInterval; } + public void setPrintJobMetricsInfoInterval(int printJobMetricsInfoInterval) { + checkPositive(printExecutionInfoInterval, ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0"); + this.printJobMetricsInfoInterval = printJobMetricsInfoInterval; + } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 22e137b0969..229d40f1cc6 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -98,6 +98,9 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { } else if (ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) { engineConfig.setPrintExecutionInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(), getTextContent(node))); + } else if (ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key().equals(name)) { + engineConfig.setPrintJobMetricsInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key(), + getTextContent(node))); } else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) { engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node)); } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) { diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 5dc5e0080e3..66ad1a5efaa 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -31,6 +31,8 @@ public class ServerConfigOptions { public static final Option PRINT_EXECUTION_INFO_INTERVAL = Options.key("print-execution-info-interval").intType().defaultValue(60).withDescription("The interval (in seconds) between two consecutive executions of the print execution info task."); + public static final Option PRINT_JOB_METRICS_INFO_INTERVAL = Options.key("print-job-metrics-info-interval").intType().defaultValue(5).withDescription("The interval (in seconds) of job print metrics info"); + public static final Option DYNAMIC_SLOT = Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether to use dynamic slot."); public static final Option SLOT_NUM = Options.key("slot-num").intType().defaultValue(2).withDescription("The number of slots. Only valid when dynamic slot is disabled."); From d9dccf7dd63ee9da1ae039498429525a83abe0a2 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 01:15:16 +0800 Subject: [PATCH 2/7] [Feature][Zeta] Support print job metrics information --- .../engine/common/config/server/ServerConfigOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index 66ad1a5efaa..6b782bc8b5b 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -31,7 +31,7 @@ public class ServerConfigOptions { public static final Option PRINT_EXECUTION_INFO_INTERVAL = Options.key("print-execution-info-interval").intType().defaultValue(60).withDescription("The interval (in seconds) between two consecutive executions of the print execution info task."); - public static final Option PRINT_JOB_METRICS_INFO_INTERVAL = Options.key("print-job-metrics-info-interval").intType().defaultValue(5).withDescription("The interval (in seconds) of job print metrics info"); + public static final Option PRINT_JOB_METRICS_INFO_INTERVAL = Options.key("print-job-metrics-info-interval").intType().defaultValue(60).withDescription("The interval (in seconds) of job print metrics info"); public static final Option DYNAMIC_SLOT = Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether to use dynamic slot."); From bcbe34c96359ddb7426804a083ad34e1ea29c407 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 01:39:30 +0800 Subject: [PATCH 3/7] [Feature][Zeta] Optimize print --- .../command/ClientExecuteCommand.java | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index 23aceec77f4..f3909cfd6a2 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -66,6 +66,9 @@ public void execute() throws CommandExecuteException { HazelcastInstance instance = null; SeaTunnelClient engineClient = null; ScheduledExecutorService executorService = null; + JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null; + LocalDateTime startTime = null; + LocalDateTime endTime = null; SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); try { String clusterName = clientCommandArgs.getClusterName(); @@ -104,12 +107,26 @@ public void execute() throws CommandExecuteException { executorService.scheduleAtFixedRate(jobMetricsRunner, 0, seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS); // get job start time - LocalDateTime startTime = LocalDateTime.now(); + startTime = LocalDateTime.now(); clientJobProxy.waitForJobComplete(); // get job end time - LocalDateTime endTime = LocalDateTime.now(); + endTime = LocalDateTime.now(); // print job statistic information when job finished - JobMetricsRunner.JobMetricsSummary jobMetricsSummary = engineClient.getJobMetricsSummary(jobId); + jobMetricsSummary = engineClient.getJobMetricsSummary(jobId); + } + } catch (ExecutionException | InterruptedException e) { + throw new CommandExecuteException("SeaTunnel job executed failed", e); + } finally { + if (engineClient != null) { + engineClient.close(); + } + if (instance != null) { + instance.shutdown(); + } + if (executorService != null) { + executorService.shutdown(); + } + if (jobMetricsSummary != null) { log.info(String.format( "\n" + "***********************************************" + "\n" + " %s" + @@ -136,18 +153,6 @@ public void execute() throws CommandExecuteException { "Total Failed Count", jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount())); } - } catch (ExecutionException | InterruptedException e) { - throw new CommandExecuteException("SeaTunnel job executed failed", e); - } finally { - if (engineClient != null) { - engineClient.close(); - } - if (instance != null) { - instance.shutdown(); - } - if (executorService != null) { - executorService.shutdown(); - } } } From ba744b92c2538b7992c8eeb0e14c580eb04b6e5a Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 11:25:01 +0800 Subject: [PATCH 4/7] [Feature][Zeta] Improve code structure --- .../common/utils/StringFormatUtils.java | 40 +++++++++++++ .../common/utils/StringFormatUtilsTest.java | 56 +++++++++++++++++++ .../command/ClientExecuteCommand.java | 19 +++---- .../engine/client/job/JobMetricsRunner.java | 20 +++---- .../engine/server/CoordinatorService.java | 9 +-- 5 files changed, 112 insertions(+), 32 deletions(-) create mode 100644 seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java create mode 100644 seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java new file mode 100644 index 00000000000..cad9d667c1b --- /dev/null +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.common.utils; + +import java.util.Collections; + +public class StringFormatUtils { + private static final int NUM = 47; + + private StringFormatUtils() { + // utility class can not be instantiated + } + + public static String formatTable(Object... objects) { + String title = objects[0].toString(); + int blankNum = (NUM - title.length()) / 2; + int kvNum = (objects.length - 1) / 2; + String template = "\n" + "***********************************************" + + "\n" + String.join("", Collections.nCopies(blankNum, " ")) + "%s" + + "\n" + "***********************************************" + + "\n" + String.join("", Collections.nCopies(kvNum, "%-26s: %19s\n")) + + "***********************************************\n"; + return String.format(template, objects); + } +} diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java new file mode 100644 index 00000000000..ce1632ac5f4 --- /dev/null +++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.common.utils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class StringFormatUtilsTest { + @Test + public void testStringFormat() { + String s = StringFormatUtils.formatTable("Job Statistic Information", + "Start Time", + "2023-01-11 00:00:00", + + "End Time", + "2023-01-11 00:00:00", + + "Total Time(s)", + 0, + + "Total Read Count", + 0, + + "Total Write Count", + 0, + + "Total Failed Count", + 0); + Assertions.assertEquals(s, "\n" + + "***********************************************\n" + + " Job Statistic Information\n" + + "***********************************************\n" + + "Start Time : 2023-01-11 00:00:00\n" + + "End Time : 2023-01-11 00:00:00\n" + + "Total Time(s) : 0\n" + + "Total Read Count : 0\n" + + "Total Write Count : 0\n" + + "Total Failed Count : 0\n" + + "***********************************************\n"); + } +} diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index f3909cfd6a2..ad2e6265335 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -18,8 +18,9 @@ package org.apache.seatunnel.core.starter.seatunnel.command; import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist; -import static org.apache.seatunnel.engine.client.job.JobMetricsRunner.DATETIME_FORMATTER; +import org.apache.seatunnel.common.utils.DateTimeUtils; +import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.core.starter.command.Command; import org.apache.seatunnel.core.starter.enums.MasterType; import org.apache.seatunnel.core.starter.exception.CommandExecuteException; @@ -67,8 +68,8 @@ public void execute() throws CommandExecuteException { SeaTunnelClient engineClient = null; ScheduledExecutorService executorService = null; JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null; - LocalDateTime startTime = null; - LocalDateTime endTime = null; + LocalDateTime startTime = LocalDateTime.now(); + LocalDateTime endTime = LocalDateTime.now(); SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); try { String clusterName = clientCommandArgs.getClusterName(); @@ -127,19 +128,13 @@ public void execute() throws CommandExecuteException { executorService.shutdown(); } if (jobMetricsSummary != null) { - log.info(String.format( - "\n" + "***********************************************" + - "\n" + " %s" + - "\n" + "***********************************************" + - "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" - + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" - + "***********************************************\n", + log.info(StringFormatUtils.formatTable( "Job Statistic Information", "Start Time", - DATETIME_FORMATTER.format(startTime), + DateTimeUtils.toString(startTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), "End Time", - DATETIME_FORMATTER.format(endTime), + DateTimeUtils.toString(endTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), "Total Time(s)", Duration.between(startTime, endTime).getSeconds(), diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java index edd2613b587..5000f0f6d6c 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.client.job; +import org.apache.seatunnel.common.utils.DateTimeUtils; +import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; import lombok.AllArgsConstructor; @@ -25,11 +27,9 @@ import java.time.Duration; import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; @Slf4j public class JobMetricsRunner extends Thread { - public static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private final SeaTunnelClient seaTunnelClient; private final Long jobId; private LocalDateTime lastRunTime = LocalDateTime.now(); @@ -50,13 +50,7 @@ public void run() { long seconds = Duration.between(lastRunTime, now).getSeconds(); long averageRead = (jobMetricsSummary.getSourceReadCount() - lastReadCount) / seconds; long averageWrite = (jobMetricsSummary.getSinkWriteCount() - lastWriteCount) / seconds; - log.info(String.format( - "\n" + "***********************************************" + - "\n" + " %s" + - "\n" + "***********************************************" + - "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" - + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" - + "***********************************************\n", + log.info(StringFormatUtils.formatTable( "Job Progress Information", "Read Count So Far", jobMetricsSummary.getSourceReadCount(), @@ -70,11 +64,11 @@ public void run() { "Average Write Count", averageWrite + "/s", - "Last Run Time", - DATETIME_FORMATTER.format(lastRunTime), + "Last Statistic Time", + DateTimeUtils.toString(lastRunTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS), - "Current Time", - DATETIME_FORMATTER.format(now))); + "Current Statistic Time", + DateTimeUtils.toString(now, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))); lastRunTime = now; lastReadCount = jobMetricsSummary.getSourceReadCount(); lastWriteCount = jobMetricsSummary.getSinkWriteCount(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index d5d1fbf3cbe..dfc92ce38c0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.common.metrics.JobMetrics; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.EngineConfig; import org.apache.seatunnel.engine.common.exception.JobException; @@ -503,13 +504,7 @@ public void printExecutionInfo() { int poolSize = threadPoolExecutor.getPoolSize(); long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); long taskCount = threadPoolExecutor.getTaskCount(); - logger.info(String.format( - "\n" + "***********************************************" + - "\n" + " %s" + - "\n" + "***********************************************" + - "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" - + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n" - + "***********************************************\n", + logger.info(StringFormatUtils.formatTable( "CoordinatorService Thread Pool Status", "activeCount", activeCount, From c170210e92a9b235546a0dfb89ceb11d9b652468 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 15:07:07 +0800 Subject: [PATCH 5/7] [Feature][Zeta] Fix integration test --- .../org/apache/seatunnel/engine/client/SeaTunnelClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java index 1f32890f91b..7ed4ab94903 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java @@ -148,7 +148,8 @@ public JobMetricsSummary getJobMetricsSummary(Long jobId) { sinkWriteCount += sinkWriter.get("value").asLong(); } return new JobMetricsSummary(sourceReadCount, sinkWriteCount); - } catch (JsonProcessingException e) { + // Add NullPointerException because of metrics information can be empty like {} + } catch (JsonProcessingException | NullPointerException e) { return new JobMetricsSummary(sourceReadCount, sinkWriteCount); } } From ee715121e6a7d5270c663bb347b3a4bb9c03406c Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 21:28:49 +0800 Subject: [PATCH 6/7] [Feature][Zeta] Optimize code --- .../apache/seatunnel/engine/client/job/JobMetricsRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java index 5000f0f6d6c..fde09533485 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java @@ -29,7 +29,7 @@ import java.time.LocalDateTime; @Slf4j -public class JobMetricsRunner extends Thread { +public class JobMetricsRunner implements Runnable { private final SeaTunnelClient seaTunnelClient; private final Long jobId; private LocalDateTime lastRunTime = LocalDateTime.now(); From c9432a542f1fed598038d33fc244f8cad54610ca Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 12 Jan 2023 10:26:30 +0800 Subject: [PATCH 7/7] [Feature][Zeta] Optimize code --- .../seatunnel/command/ClientExecuteCommand.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index ad2e6265335..0fbd3ee5e2d 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -99,7 +99,9 @@ public void execute() throws CommandExecuteException { jobConfig.setName(clientCommandArgs.getJobName()); JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig); - + // get job start time + startTime = LocalDateTime.now(); + // create job proxy ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); // get job id long jobId = clientJobProxy.getJobId(); @@ -107,12 +109,11 @@ public void execute() throws CommandExecuteException { executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleAtFixedRate(jobMetricsRunner, 0, seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS); - // get job start time - startTime = LocalDateTime.now(); + // wait for job complete clientJobProxy.waitForJobComplete(); // get job end time endTime = LocalDateTime.now(); - // print job statistic information when job finished + // get job statistic information when job finished jobMetricsSummary = engineClient.getJobMetricsSummary(jobId); } } catch (ExecutionException | InterruptedException e) { @@ -128,6 +129,7 @@ public void execute() throws CommandExecuteException { executorService.shutdown(); } if (jobMetricsSummary != null) { + // print job statistics information when job finished log.info(StringFormatUtils.formatTable( "Job Statistic Information", "Start Time",