Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Zeta] Zeta support print metrics information #3913

Merged
merged 7 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/seatunnel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
seatunnel:
engine:
backup-count: 1
print-execution-info-interval: 10
print-execution-info-interval: 60
print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.common.utils;

import java.util.Collections;

public class StringFormatUtils {
private static final int NUM = 47;

private StringFormatUtils() {
// utility class can not be instantiated
}

public static String formatTable(Object... objects) {
String title = objects[0].toString();
int blankNum = (NUM - title.length()) / 2;
int kvNum = (objects.length - 1) / 2;
String template = "\n" + "***********************************************" +
"\n" + String.join("", Collections.nCopies(blankNum, " ")) + "%s" +
"\n" + "***********************************************" +
"\n" + String.join("", Collections.nCopies(kvNum, "%-26s: %19s\n"))
+ "***********************************************\n";
return String.format(template, objects);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.common.utils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class StringFormatUtilsTest {
@Test
public void testStringFormat() {
String s = StringFormatUtils.formatTable("Job Statistic Information",
"Start Time",
"2023-01-11 00:00:00",

"End Time",
"2023-01-11 00:00:00",

"Total Time(s)",
0,

"Total Read Count",
0,

"Total Write Count",
0,

"Total Failed Count",
0);
Assertions.assertEquals(s, "\n" +
"***********************************************\n" +
" Job Statistic Information\n" +
"***********************************************\n" +
"Start Time : 2023-01-11 00:00:00\n" +
"End Time : 2023-01-11 00:00:00\n" +
"Total Time(s) : 0\n" +
"Total Read Count : 0\n" +
"Total Write Count : 0\n" +
"Total Failed Count : 0\n" +
"***********************************************\n");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;

import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
Expand All @@ -27,6 +29,7 @@
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
Expand All @@ -38,8 +41,13 @@
import lombok.extern.slf4j.Slf4j;

import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* This command is used to execute the SeaTunnel engine job by SeaTunnel API.
Expand All @@ -53,11 +61,15 @@ public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) {
this.clientCommandArgs = clientCommandArgs;
}

@SuppressWarnings("checkstyle:RegexpSingleline")
@SuppressWarnings({"checkstyle:RegexpSingleline", "checkstyle:MagicNumber"})
@Override
public void execute() throws CommandExecuteException {
HazelcastInstance instance = null;
SeaTunnelClient engineClient = null;
ScheduledExecutorService executorService = null;
JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null;
LocalDateTime startTime = LocalDateTime.now();
LocalDateTime endTime = LocalDateTime.now();
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
try {
String clusterName = clientCommandArgs.getClusterName();
Expand Down Expand Up @@ -87,11 +99,22 @@ public void execute() throws CommandExecuteException {
jobConfig.setName(clientCommandArgs.getJobName());
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(configFile.toString(), jobConfig);

// get job start time
startTime = LocalDateTime.now();
// create job proxy
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
clientJobProxy.waitForJobComplete();
// get job id
long jobId = clientJobProxy.getJobId();
System.out.println(engineClient.getJobMetrics(jobId));
JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(jobMetricsRunner, 0,
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS);
// wait for job complete
clientJobProxy.waitForJobComplete();
// get job end time
endTime = LocalDateTime.now();
// get job statistic information when job finished
jobMetricsSummary = engineClient.getJobMetricsSummary(jobId);
}
} catch (ExecutionException | InterruptedException e) {
throw new CommandExecuteException("SeaTunnel job executed failed", e);
Expand All @@ -102,6 +125,31 @@ public void execute() throws CommandExecuteException {
if (instance != null) {
instance.shutdown();
}
if (executorService != null) {
executorService.shutdown();
}
if (jobMetricsSummary != null) {
// print job statistics information when job finished
log.info(StringFormatUtils.formatTable(
"Job Statistic Information",
"Start Time",
DateTimeUtils.toString(startTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),

"End Time",
DateTimeUtils.toString(endTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),

"Total Time(s)",
Duration.between(startTime, endTime).getSeconds(),

"Total Read Count",
jobMetricsSummary.getSourceReadCount(),

"Total Write Count",
jobMetricsSummary.getSinkWriteCount(),

"Total Failed Count",
jobMetricsSummary.getSourceReadCount() - jobMetricsSummary.getSinkWriteCount()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
Expand All @@ -31,11 +32,15 @@
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.logging.ILogger;
import lombok.NonNull;

public class SeaTunnelClient implements SeaTunnelClientInstance {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final SeaTunnelHazelcastClient hazelcastClient;

public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
Expand Down Expand Up @@ -127,4 +132,25 @@ public JobDAGInfo getJobInfo(Long jobId) {
SeaTunnelGetJobInfoCodec::decodeResponse
));
}

public JobMetricsSummary getJobMetricsSummary(Long jobId) {
long sourceReadCount = 0L;
long sinkWriteCount = 0L;
String jobMetrics = getJobMetrics(jobId);
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
for (int i = 0; i < sourceReaders.size(); i++) {
JsonNode sourceReader = sourceReaders.get(i);
JsonNode sinkWriter = sinkWriters.get(i);
sourceReadCount += sourceReader.get("value").asLong();
sinkWriteCount += sinkWriter.get("value").asLong();
}
return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
// Add NullPointerException because of metrics information can be empty like {}
} catch (JsonProcessingException | NullPointerException e) {
return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.client.job;

import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.time.LocalDateTime;

@Slf4j
public class JobMetricsRunner implements Runnable {
private final SeaTunnelClient seaTunnelClient;
private final Long jobId;
private LocalDateTime lastRunTime = LocalDateTime.now();
private Long lastReadCount = 0L;
private Long lastWriteCount = 0L;

public JobMetricsRunner(SeaTunnelClient seaTunnelClient, Long jobId) {
this.seaTunnelClient = seaTunnelClient;
this.jobId = jobId;
}

@Override
public void run() {
Thread.currentThread().setName("job-metrics-runner-" + jobId);
try {
JobMetricsSummary jobMetricsSummary = seaTunnelClient.getJobMetricsSummary(jobId);
LocalDateTime now = LocalDateTime.now();
long seconds = Duration.between(lastRunTime, now).getSeconds();
long averageRead = (jobMetricsSummary.getSourceReadCount() - lastReadCount) / seconds;
long averageWrite = (jobMetricsSummary.getSinkWriteCount() - lastWriteCount) / seconds;
log.info(StringFormatUtils.formatTable(
"Job Progress Information",
"Read Count So Far",
jobMetricsSummary.getSourceReadCount(),

"Write Count So Far",
jobMetricsSummary.getSinkWriteCount(),

"Average Read Count",
averageRead + "/s",

"Average Write Count",
averageWrite + "/s",

"Last Statistic Time",
DateTimeUtils.toString(lastRunTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),

"Current Statistic Time",
DateTimeUtils.toString(now, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)));
lastRunTime = now;
lastReadCount = jobMetricsSummary.getSourceReadCount();
lastWriteCount = jobMetricsSummary.getSinkWriteCount();
} catch (Exception e) {
log.warn("Failed to get job metrics summary, it maybe first-run");
}
}

@Data
@AllArgsConstructor
public static class JobMetricsSummary {
private long sourceReadCount;
private long sinkWriteCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class EngineConfig {
private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue();
private int printExecutionInfoInterval = ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue();

private int printJobMetricsInfoInterval = ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.defaultValue();

private SlotServiceConfig slotServiceConfig = ServerConfigOptions.SLOT_SERVICE.defaultValue();

private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue();
Expand All @@ -46,4 +48,8 @@ public void setPrintExecutionInfoInterval(int printExecutionInfoInterval) {
this.printExecutionInfoInterval = printExecutionInfoInterval;
}

public void setPrintJobMetricsInfoInterval(int printJobMetricsInfoInterval) {
checkPositive(printExecutionInfoInterval, ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0");
this.printJobMetricsInfoInterval = printJobMetricsInfoInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
} else if (ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) {
engineConfig.setPrintExecutionInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(),
getTextContent(node)));
} else if (ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key().equals(name)) {
engineConfig.setPrintJobMetricsInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key(),
getTextContent(node)));
} else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) {
engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
} else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class ServerConfigOptions {

public static final Option<Integer> PRINT_EXECUTION_INFO_INTERVAL = Options.key("print-execution-info-interval").intType().defaultValue(60).withDescription("The interval (in seconds) between two consecutive executions of the print execution info task.");

public static final Option<Integer> PRINT_JOB_METRICS_INFO_INTERVAL = Options.key("print-job-metrics-info-interval").intType().defaultValue(60).withDescription("The interval (in seconds) of job print metrics info");

public static final Option<Boolean> DYNAMIC_SLOT = Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether to use dynamic slot.");

public static final Option<Integer> SLOT_NUM = Options.key("slot-num").intType().defaultValue(2).withDescription("The number of slots. Only valid when dynamic slot is disabled.");
Expand Down
Loading