diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md index 99bba92dae0..eb5eacfed13 100644 --- a/docs/en/seatunnel-engine/rest-api.md +++ b/docs/en/seatunnel-engine/rest-api.md @@ -192,8 +192,22 @@ This API has been deprecated, please use /hazelcast/rest/maps/job-info/:jobId in ] }, "metrics": { - "sourceReceivedCount": "", - "sinkWriteCount": "" + "SourceReceivedCount": "", + "SourceReceivedQPS": "", + "SourceReceivedBytes": "", + "SourceReceivedBytesPerSeconds": "", + "SinkWriteCount": "", + "SinkWriteQPS": "", + "SinkWriteBytes": "", + "SinkWriteBytesPerSeconds": "", + "TableSourceReceivedCount": {}, + "TableSourceReceivedBytes": {}, + "TableSourceReceivedBytesPerSeconds": {}, + "TableSourceReceivedQPS": {}, + "TableSinkWriteCount": {}, + "TableSinkWriteQPS": {}, + "TableSinkWriteBytes": {}, + "TableSinkWriteBytesPerSeconds": {} }, "finishedTime": "", "errorMsg": null, diff --git a/docs/zh/seatunnel-engine/rest-api.md b/docs/zh/seatunnel-engine/rest-api.md index 1b0166425ba..d38ad61268c 100644 --- a/docs/zh/seatunnel-engine/rest-api.md +++ b/docs/zh/seatunnel-engine/rest-api.md @@ -134,8 +134,22 @@ network: ] }, "metrics": { - "sourceReceivedCount": "", - "sinkWriteCount": "" + "SourceReceivedCount": "", + "SourceReceivedQPS": "", + "SourceReceivedBytes": "", + "SourceReceivedBytesPerSeconds": "", + "SinkWriteCount": "", + "SinkWriteQPS": "", + "SinkWriteBytes": "", + "SinkWriteBytesPerSeconds": "", + "TableSourceReceivedCount": {}, + "TableSourceReceivedBytes": {}, + "TableSourceReceivedBytesPerSeconds": {}, + "TableSourceReceivedQPS": {}, + "TableSinkWriteCount": {}, + "TableSinkWriteQPS": {}, + "TableSinkWriteBytes": {}, + "TableSinkWriteBytesPerSeconds": {} }, "finishedTime": "", "errorMsg": null, diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java index 59942eb4cc8..61df054d074 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java @@ -35,6 +35,7 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import io.restassured.response.Response; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -82,33 +83,101 @@ public void multiTableMetrics() { Collections.singletonList(node1) .forEach( instance -> { - given().get( - HOST - + instance.getCluster() - .getLocalMember() - .getAddress() - .getPort() - + RestConstant.JOB_INFO_URL - + "/" - + batchJobProxy.getJobId()) - .then() + Response response = + given().get( + HOST + + instance.getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.JOB_INFO_URL + + "/" + + batchJobProxy.getJobId()); + // In the test example, the data size of a single [3, "C", 100] is 13 + int dataSize = 13; + response.prettyPrint(); + response.then() .statusCode(200) .body("jobName", equalTo("batch_fake_multi_table_to_console")) .body("jobStatus", equalTo("FINISHED")) - .body("metrics.SourceReceivedCount", equalTo("50")) - .body("metrics.SinkWriteCount", equalTo("50")) + .body("metrics.SourceReceivedCount", equalTo("15")) + .body("metrics.SinkWriteCount", equalTo("15")) .body( "metrics.TableSourceReceivedCount.'fake.table1'", - equalTo("20")) + equalTo("10")) .body( "metrics.TableSourceReceivedCount.'fake.public.table2'", - equalTo("30")) + equalTo("5")) .body( "metrics.TableSinkWriteCount.'fake.table1'", - equalTo("20")) + equalTo("10")) .body( "metrics.TableSinkWriteCount.'fake.public.table2'", - equalTo("30")); + equalTo("5")) + .body( + "metrics.SourceReceivedBytes", + equalTo(String.valueOf(dataSize * 15))) + .body( + "metrics.SinkWriteBytes", + equalTo(String.valueOf(dataSize * 15))) + .body( + "metrics.TableSourceReceivedBytes.'fake.table1'", + equalTo(String.valueOf(dataSize * 10))) + .body( + "metrics.TableSourceReceivedBytes.'fake.public.table2'", + equalTo(String.valueOf(dataSize * 5))) + .body( + "metrics.TableSinkWriteBytes.'fake.table1'", + equalTo(String.valueOf(dataSize * 10))) + .body( + "metrics.TableSinkWriteBytes.'fake.public.table2'", + equalTo(String.valueOf(dataSize * 5))); + Assertions.assertTrue( + Double.parseDouble(response.path("metrics.SourceReceivedQPS")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TableSourceReceivedQPS.'fake.table1'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TableSourceReceivedQPS.'fake.public.table2'")) + > 0 + && Double.parseDouble( + response.path("metrics.SinkWriteQPS")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TableSinkWriteQPS.'fake.table1'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TableSinkWriteQPS.'fake.public.table2'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.SourceReceivedBytesPerSeconds")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TableSourceReceivedBytesPerSeconds.'fake.table1'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TableSourceReceivedBytesPerSeconds.'fake.public.table2'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.SinkWriteBytesPerSeconds")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TableSinkWriteBytesPerSeconds.'fake.table1'")) + > 0 + && Double.parseDouble( + response.path( + "metrics.TableSinkWriteBytesPerSeconds.'fake.public.table2'")) + > 0); }); } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf index c51929a0edb..7459cc150e4 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf @@ -26,29 +26,93 @@ env { source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { - result_table_name = "fake1" - row.num = 20 - schema = { - table = "fake.table1" - fields { - name = "string" - age = "int" + result_table_name = "fake1" + schema = { + table = "fake.table1" + fields { + id = bigint + name = string + score = int + } } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A", 300] + }, + { + kind = DELETE + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + } + ] } - } - FakeSource { - result_table_name = "fake2" - row.num = 30 - schema = { - table = "fake.public.table2" - fields { - name = "string" - age = "int" - sex = "int" + FakeSource { + result_table_name = "fake2" + schema = { + table = "fake.public.table2" + fields { + id = bigint + name = string + score = int + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + ] } - } - } + } transform { diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index 100aa0b3203..a8275a13b74 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -62,8 +62,12 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS; import static org.awaitility.Awaitility.await; @@ -592,6 +596,23 @@ public void testGetMultiTableJobMetrics() { jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.public.table2")); Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.table1")); Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.public.table2")); + Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_BYTES + "#fake.table1")); + Assertions.assertTrue( + jobMetrics.contains(SOURCE_RECEIVED_BYTES + "#fake.public.table2")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_BYTES + "#fake.table1")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_BYTES + "#fake.public.table2")); + Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS + "#fake.table1")); + Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS + "#fake.public.table2")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_QPS + "#fake.table1")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_QPS + "#fake.public.table2")); + Assertions.assertTrue( + jobMetrics.contains(SOURCE_RECEIVED_BYTES_PER_SECONDS + "#fake.table1")); + Assertions.assertTrue( + jobMetrics.contains(SOURCE_RECEIVED_BYTES_PER_SECONDS + "#fake.public.table2")); + Assertions.assertTrue( + jobMetrics.contains(SINK_WRITE_BYTES_PER_SECONDS + "#fake.table1")); + Assertions.assertTrue( + jobMetrics.contains(SINK_WRITE_BYTES_PER_SECONDS + "#fake.public.table2")); log.info("jobMetrics : {}", jobMetrics); JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); @@ -600,10 +621,6 @@ public void testGetMultiTableJobMetrics() { Spliterators.spliteratorUnknownSize( jobMetricsStr.fieldNames(), 0), false) - .filter( - metricName -> - metricName.startsWith(SOURCE_RECEIVED_COUNT) - || metricName.startsWith(SINK_WRITE_COUNT)) .collect(Collectors.toList()); Map totalCount = @@ -654,6 +671,31 @@ public void testGetMultiTableJobMetrics() { .filter(e -> e.getKey().startsWith(SINK_WRITE_COUNT)) .mapToLong(Map.Entry::getValue) .sum()); + Assertions.assertEquals( + totalCount.get(SOURCE_RECEIVED_BYTES), + tableCount.entrySet().stream() + .filter(e -> e.getKey().startsWith(SOURCE_RECEIVED_BYTES + "#")) + .mapToLong(Map.Entry::getValue) + .sum()); + Assertions.assertEquals( + totalCount.get(SINK_WRITE_BYTES), + tableCount.entrySet().stream() + .filter(e -> e.getKey().startsWith(SINK_WRITE_BYTES + "#")) + .mapToLong(Map.Entry::getValue) + .sum()); + // Instantaneous rates in the same direction are directly added + Assertions.assertEquals( + totalCount.get(SOURCE_RECEIVED_QPS), + tableCount.entrySet().stream() + .filter(e -> e.getKey().startsWith(SOURCE_RECEIVED_QPS + "#")) + .mapToLong(Map.Entry::getValue) + .sum()); + Assertions.assertEquals( + totalCount.get(SINK_WRITE_QPS), + tableCount.entrySet().stream() + .filter(e -> e.getKey().startsWith(SINK_WRITE_QPS + "#")) + .mapToLong(Map.Entry::getValue) + .sum()); } catch (ExecutionException | InterruptedException | JsonProcessingException e) { throw new RuntimeException(e); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java new file mode 100644 index 00000000000..eab9ecbd348 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.engine.server.metrics; + +import org.apache.seatunnel.api.common.metrics.Counter; +import org.apache.seatunnel.api.common.metrics.Meter; +import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.constants.PluginType; + +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS; + +public class TaskMetricsCalcContext { + + private final MetricsContext metricsContext; + + private final PluginType type; + + private Counter count; + + private Map countPerTable = new ConcurrentHashMap<>(); + + private Meter QPS; + + private Map QPSPerTable = new ConcurrentHashMap<>(); + + private Counter bytes; + + private Map bytesPerTable = new ConcurrentHashMap<>(); + + private Meter bytesPerSeconds; + + private Map bytesPerSecondsPerTable = new ConcurrentHashMap<>(); + + public TaskMetricsCalcContext( + MetricsContext metricsContext, + PluginType type, + boolean isMulti, + List tables) { + this.metricsContext = metricsContext; + this.type = type; + initializeMetrics(isMulti, tables); + } + + private void initializeMetrics(boolean isMulti, List tables) { + if (type.equals(PluginType.SINK)) { + this.initializeMetrics( + isMulti, + tables, + SINK_WRITE_COUNT, + SINK_WRITE_QPS, + SINK_WRITE_BYTES, + SINK_WRITE_BYTES_PER_SECONDS); + } else if (type.equals(PluginType.SOURCE)) { + this.initializeMetrics( + isMulti, + tables, + SOURCE_RECEIVED_COUNT, + SOURCE_RECEIVED_QPS, + SOURCE_RECEIVED_BYTES, + SOURCE_RECEIVED_BYTES_PER_SECONDS); + } + } + + private void initializeMetrics( + boolean isMulti, + List tables, + String countName, + String qpsName, + String bytesName, + String bytesPerSecondsName) { + count = metricsContext.counter(countName); + QPS = metricsContext.meter(qpsName); + bytes = metricsContext.counter(bytesName); + bytesPerSeconds = metricsContext.meter(bytesPerSecondsName); + if (isMulti) { + tables.forEach( + tablePath -> { + countPerTable.put( + tablePath.getFullName(), + metricsContext.counter(countName + "#" + tablePath.getFullName())); + QPSPerTable.put( + tablePath.getFullName(), + metricsContext.meter(qpsName + "#" + tablePath.getFullName())); + bytesPerTable.put( + tablePath.getFullName(), + metricsContext.counter(bytesName + "#" + tablePath.getFullName())); + bytesPerSecondsPerTable.put( + tablePath.getFullName(), + metricsContext.meter( + bytesPerSecondsName + "#" + tablePath.getFullName())); + }); + } + } + + public void updateMetrics(Object data) { + count.inc(); + QPS.markEvent(); + if (data instanceof SeaTunnelRow) { + SeaTunnelRow row = (SeaTunnelRow) data; + bytes.inc(row.getBytesSize()); + bytesPerSeconds.markEvent(row.getBytesSize()); + String tableId = row.getTableId(); + + if (StringUtils.isNotBlank(tableId)) { + String tableName = TablePath.of(tableId).getFullName(); + + // Processing count + processMetrics( + countPerTable, + Counter.class, + tableName, + SINK_WRITE_COUNT, + SOURCE_RECEIVED_COUNT, + Counter::inc); + + // Processing bytes + processMetrics( + bytesPerTable, + Counter.class, + tableName, + SINK_WRITE_BYTES, + SOURCE_RECEIVED_BYTES, + counter -> counter.inc(row.getBytesSize())); + + // Processing QPS + processMetrics( + QPSPerTable, + Meter.class, + tableName, + SINK_WRITE_QPS, + SOURCE_RECEIVED_QPS, + Meter::markEvent); + + // Processing bytes rate + processMetrics( + bytesPerSecondsPerTable, + Meter.class, + tableName, + SINK_WRITE_BYTES_PER_SECONDS, + SOURCE_RECEIVED_BYTES_PER_SECONDS, + meter -> meter.markEvent(row.getBytesSize())); + } + } + } + + private void processMetrics( + Map metricMap, + Class cls, + String tableName, + String sinkMetric, + String sourceMetric, + MetricProcessor processor) { + T metric = metricMap.get(tableName); + if (Objects.nonNull(metric)) { + processor.process(metric); + } else { + String metricName = + PluginType.SINK.equals(type) + ? sinkMetric + "#" + tableName + : sourceMetric + "#" + tableName; + T newMetric = createMetric(metricsContext, metricName, cls); + processor.process(newMetric); + metricMap.put(tableName, newMetric); + } + } + + private T createMetric( + MetricsContext metricsContext, String metricName, Class metricClass) { + if (metricClass == Counter.class) { + return metricClass.cast(metricsContext.counter(metricName)); + } else if (metricClass == Meter.class) { + return metricClass.cast(metricsContext.meter(metricName)); + } + throw new IllegalArgumentException("Unsupported metric class: " + metricClass.getName()); + } + + @FunctionalInterface + interface MetricProcessor { + void process(T t); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index d5d60b7cbb4..fec77708b68 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -44,6 +44,8 @@ import org.apache.seatunnel.engine.server.resourcemanager.resource.OverviewInfo; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; +import org.apache.commons.lang3.ArrayUtils; + import com.hazelcast.cluster.Address; import com.hazelcast.cluster.Cluster; import com.hazelcast.cluster.Member; @@ -65,12 +67,20 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.Spliterators; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT; +import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS; import static org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO; import static org.apache.seatunnel.engine.server.rest.RestConstant.JOB_INFO_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.OVERVIEW; @@ -81,14 +91,22 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor { - private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount"; private static final String TABLE_SOURCE_RECEIVED_COUNT = "TableSourceReceivedCount"; - private static final String SINK_WRITE_COUNT = "SinkWriteCount"; private static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount"; + private static final String TABLE_SOURCE_RECEIVED_QPS = "TableSourceReceivedQPS"; + private static final String TABLE_SINK_WRITE_QPS = "TableSinkWriteQPS"; + private static final String TABLE_SOURCE_RECEIVED_BYTES = "TableSourceReceivedBytes"; + private static final String TABLE_SINK_WRITE_BYTES = "TableSinkWriteBytes"; + private static final String TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS = + "TableSourceReceivedBytesPerSeconds"; + private static final String TABLE_SINK_WRITE_BYTES_PER_SECONDS = + "TableSinkWriteBytesPerSeconds"; + private final Log4j2HttpGetCommandProcessor original; private NodeEngine nodeEngine; public RestHttpGetCommandProcessor(TextCommandService textCommandService) { + this(textCommandService, new Log4j2HttpGetCommandProcessor(textCommandService)); } @@ -369,43 +387,165 @@ private void getRunningThread(HttpGetCommand command) { private Map getJobMetrics(String jobMetrics) { Map metricsMap = new HashMap<>(); - long sourceReadCount = 0L; - long sinkWriteCount = 0L; - Map tableSourceReceivedCountMap = new HashMap<>(); - Map tableSinkWriteCountMap = new HashMap<>(); + // To add metrics, populate the corresponding array, + String[] countMetricsNames = { + SOURCE_RECEIVED_COUNT, SINK_WRITE_COUNT, SOURCE_RECEIVED_BYTES, SINK_WRITE_BYTES + }; + String[] rateMetricsNames = { + SOURCE_RECEIVED_QPS, + SINK_WRITE_QPS, + SOURCE_RECEIVED_BYTES_PER_SECONDS, + SINK_WRITE_BYTES_PER_SECONDS + }; + String[] tableCountMetricsNames = { + TABLE_SOURCE_RECEIVED_COUNT, + TABLE_SINK_WRITE_COUNT, + TABLE_SOURCE_RECEIVED_BYTES, + TABLE_SINK_WRITE_BYTES + }; + String[] tableRateMetricsNames = { + TABLE_SOURCE_RECEIVED_QPS, + TABLE_SINK_WRITE_QPS, + TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS, + TABLE_SINK_WRITE_BYTES_PER_SECONDS + }; + Long[] metricsSums = + Stream.generate(() -> 0L).limit(countMetricsNames.length).toArray(Long[]::new); + Double[] metricsRates = + Stream.generate(() -> 0D).limit(rateMetricsNames.length).toArray(Double[]::new); + + // Used to store various indicators at the table + Map[] tableMetricsMaps = + new Map[] { + new HashMap<>(), // Source Received Count + new HashMap<>(), // Sink Write Count + new HashMap<>(), // Source Received Bytes + new HashMap<>(), // Sink Write Bytes + new HashMap<>(), // Source Received QPS + new HashMap<>(), // Sink Write QPS + new HashMap<>(), // Source Received Bytes Per Second + new HashMap<>() // Sink Write Bytes Per Second + }; + try { JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); - StreamSupport.stream( - Spliterators.spliteratorUnknownSize(jobMetricsStr.fieldNames(), 0), - false) - .filter(metricName -> metricName.contains("#")) - .forEach( + + jobMetricsStr + .fieldNames() + .forEachRemaining( metricName -> { - String tableName = - TablePath.of(metricName.split("#")[1]).getFullName(); - if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) { - tableSourceReceivedCountMap.put( - tableName, jobMetricsStr.get(metricName)); - } - if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) { - tableSinkWriteCountMap.put( - tableName, jobMetricsStr.get(metricName)); + if (metricName.contains("#")) { + String tableName = + TablePath.of(metricName.split("#")[1]).getFullName(); + JsonNode metricNode = jobMetricsStr.get(metricName); + processMetric( + metricName, tableName, metricNode, tableMetricsMaps); } }); - JsonNode sourceReceivedCountJson = jobMetricsStr.get(SOURCE_RECEIVED_COUNT); - JsonNode sinkWriteCountJson = jobMetricsStr.get(SINK_WRITE_COUNT); - for (int i = 0; i < jobMetricsStr.get(SOURCE_RECEIVED_COUNT).size(); i++) { - JsonNode sourceReader = sourceReceivedCountJson.get(i); - JsonNode sinkWriter = sinkWriteCountJson.get(i); - sourceReadCount += sourceReader.get("value").asLong(); - sinkWriteCount += sinkWriter.get("value").asLong(); - } - } catch (JsonProcessingException | NullPointerException e) { + + // Aggregation summary and rate metrics + aggregateMetrics( + jobMetricsStr, + metricsSums, + metricsRates, + ArrayUtils.addAll(countMetricsNames, rateMetricsNames)); + + } catch (JsonProcessingException e) { return metricsMap; } - Map tableSourceReceivedCount = - tableSourceReceivedCountMap.entrySet().stream() + populateMetricsMap( + metricsMap, + tableMetricsMaps, + ArrayUtils.addAll(tableCountMetricsNames, tableRateMetricsNames), + countMetricsNames.length); + populateMetricsMap( + metricsMap, + Stream.concat(Arrays.stream(metricsSums), Arrays.stream(metricsRates)) + .toArray(Number[]::new), + ArrayUtils.addAll(countMetricsNames, rateMetricsNames), + metricsSums.length); + + return metricsMap; + } + + private void processMetric( + String metricName, + String tableName, + JsonNode metricNode, + Map[] tableMetricsMaps) { + if (metricNode == null) return; + + // Define index constant + final int SOURCE_COUNT_IDX = 0, + SINK_COUNT_IDX = 1, + SOURCE_BYTES_IDX = 2, + SINK_BYTES_IDX = 3, + SOURCE_QPS_IDX = 4, + SINK_QPS_IDX = 5, + SOURCE_BYTES_SEC_IDX = 6, + SINK_BYTES_SEC_IDX = 7; + if (metricName.startsWith(SOURCE_RECEIVED_COUNT + "#")) { + tableMetricsMaps[SOURCE_COUNT_IDX].put(tableName, metricNode); + } else if (metricName.startsWith(SINK_WRITE_COUNT + "#")) { + tableMetricsMaps[SINK_COUNT_IDX].put(tableName, metricNode); + } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES + "#")) { + tableMetricsMaps[SOURCE_BYTES_IDX].put(tableName, metricNode); + } else if (metricName.startsWith(SINK_WRITE_BYTES + "#")) { + tableMetricsMaps[SINK_BYTES_IDX].put(tableName, metricNode); + } else if (metricName.startsWith(SOURCE_RECEIVED_QPS + "#")) { + tableMetricsMaps[SOURCE_QPS_IDX].put(tableName, metricNode); + } else if (metricName.startsWith(SINK_WRITE_QPS + "#")) { + tableMetricsMaps[SINK_QPS_IDX].put(tableName, metricNode); + } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES_PER_SECONDS + "#")) { + tableMetricsMaps[SOURCE_BYTES_SEC_IDX].put(tableName, metricNode); + } else if (metricName.startsWith(SINK_WRITE_BYTES_PER_SECONDS + "#")) { + tableMetricsMaps[SINK_BYTES_SEC_IDX].put(tableName, metricNode); + } + } + + private void aggregateMetrics( + JsonNode jobMetricsStr, + Long[] metricsSums, + Double[] metricsRates, + String[] metricsNames) { + for (int i = 0; i < metricsNames.length; i++) { + JsonNode metricNode = jobMetricsStr.get(metricsNames[i]); + if (metricNode != null && metricNode.isArray()) { + for (JsonNode node : metricNode) { + // Match Rate Metrics vs. Value Metrics + if (i < metricsSums.length) { + metricsSums[i] += node.path("value").asLong(); + } else { + metricsRates[i - metricsSums.length] += node.path("value").asDouble(); + } + } + } + } + } + + private void populateMetricsMap( + Map metricsMap, + Object[] metrics, + String[] metricNames, + int countMetricNames) { + for (int i = 0; i < metrics.length; i++) { + if (metrics[i] != null) { + if (metrics[i] instanceof Map) { + metricsMap.put( + metricNames[i], + aggregateMap( + (Map) metrics[i], i >= countMetricNames)); + } else { + metricsMap.put(metricNames[i], metrics[i]); + } + } + } + } + + public static Map aggregateMap(Map inputMap, boolean isRate) { + return isRate + ? inputMap.entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, @@ -413,11 +553,12 @@ private Map getJobMetrics(String jobMetrics) { StreamSupport.stream( entry.getValue().spliterator(), false) - .mapToLong( - node -> node.get("value").asLong()) - .sum())); - Map tableSinkWriteCount = - tableSinkWriteCountMap.entrySet().stream() + .mapToDouble( + node -> + node.path("value") + .asDouble()) + .sum())) + : inputMap.entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, @@ -426,14 +567,8 @@ private Map getJobMetrics(String jobMetrics) { entry.getValue().spliterator(), false) .mapToLong( - node -> node.get("value").asLong()) + node -> node.path("value").asLong()) .sum())); - - metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount); - metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount); - metricsMap.put(TABLE_SOURCE_RECEIVED_COUNT, tableSourceReceivedCount); - metricsMap.put(TABLE_SINK_WRITE_COUNT, tableSinkWriteCount); - return metricsMap; } private SeaTunnelServer getSeaTunnelServer(boolean shouldBeMaster) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index e1b24947893..7f2e34bdcb8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.engine.server.task; -import org.apache.seatunnel.api.common.metrics.Counter; -import org.apache.seatunnel.api.common.metrics.Meter; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -30,13 +28,14 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate; import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext; import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import lombok.extern.slf4j.Slf4j; @@ -44,15 +43,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES; -import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS; -import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT; -import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS; - @Slf4j public class SeaTunnelSourceCollector implements Collector { @@ -62,19 +54,12 @@ public class SeaTunnelSourceCollector implements Collector { private final MetricsContext metricsContext; + private final TaskMetricsCalcContext taskMetricsCalcContext; + private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new AtomicBoolean(false); private final AtomicBoolean schemaChangeAfterCheckpointSignal = new AtomicBoolean(false); - private final Counter sourceReceivedCount; - - private final Map sourceReceivedCountPerTable = new ConcurrentHashMap<>(); - - private final Meter sourceReceivedQPS; - private final Counter sourceReceivedBytes; - - private final Meter sourceReceivedBytesPerSeconds; - private volatile boolean emptyThisPollNext; private final DataTypeChangeEventHandler dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher(); @@ -98,20 +83,12 @@ public SeaTunnelSourceCollector( .iterator() .forEachRemaining(type -> this.rowTypeMap.put(type.getKey(), type.getValue())); } - if (CollectionUtils.isNotEmpty(tablePaths)) { - tablePaths.forEach( - tablePath -> - sourceReceivedCountPerTable.put( - tablePath.getFullName(), - metricsContext.counter( - SOURCE_RECEIVED_COUNT - + "#" - + tablePath.getFullName()))); - } - sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT); - sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS); - sourceReceivedBytes = metricsContext.counter(SOURCE_RECEIVED_BYTES); - sourceReceivedBytesPerSeconds = metricsContext.meter(SOURCE_RECEIVED_BYTES_PER_SECONDS); + this.taskMetricsCalcContext = + new TaskMetricsCalcContext( + metricsContext, + PluginType.SOURCE, + CollectionUtils.isNotEmpty(tablePaths), + tablePaths); flowControlGate = FlowControlGate.create(flowControlStrategy); } @@ -129,26 +106,11 @@ public void collect(T row) { throw new SeaTunnelEngineException( "Unsupported row type: " + rowType.getClass().getName()); } - sourceReceivedBytes.inc(size); - sourceReceivedBytesPerSeconds.markEvent(size); flowControlGate.audit((SeaTunnelRow) row); - if (StringUtils.isNotEmpty(tableId)) { - String tableName = TablePath.of(tableId).getFullName(); - Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName); - if (Objects.nonNull(sourceTableCounter)) { - sourceTableCounter.inc(); - } else { - Counter counter = - metricsContext.counter(SOURCE_RECEIVED_COUNT + "#" + tableName); - counter.inc(); - sourceReceivedCountPerTable.put(tableName, counter); - } - } + taskMetricsCalcContext.updateMetrics(row); } sendRecordToNext(new Record<>(row)); emptyThisPollNext = false; - sourceReceivedCount.inc(); - sourceReceivedQPS.markEvent(); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index de8257f1e94..cacaa75aaef 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.engine.server.task.flow; -import org.apache.seatunnel.api.common.metrics.Counter; -import org.apache.seatunnel.api.common.metrics.Meter; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.event.EventListener; import org.apache.seatunnel.api.serialization.Serializer; @@ -30,13 +28,14 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey; import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState; import org.apache.seatunnel.engine.server.event.JobEventListener; import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext; import org.apache.seatunnel.engine.server.task.SeaTunnelTask; import org.apache.seatunnel.engine.server.task.context.SinkWriterContext; import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation; @@ -45,8 +44,6 @@ import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation; import org.apache.seatunnel.engine.server.task.record.Barrier; -import org.apache.commons.lang3.StringUtils; - import com.hazelcast.cluster.Address; import lombok.extern.slf4j.Slf4j; @@ -56,18 +53,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES; -import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS; -import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; -import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS; import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky; import static org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates; @@ -96,15 +87,7 @@ public class SinkFlowLifeCycle sinkWriteCountPerTable = new ConcurrentHashMap<>(); - - private Meter sinkWriteQPS; - - private Counter sinkWriteBytes; - - private Meter sinkWriteBytesPerSeconds; + private TaskMetricsCalcContext taskMetricsCalcContext; private final boolean containAggCommitter; @@ -129,19 +112,13 @@ public SinkFlowLifeCycle( this.containAggCommitter = containAggCommitter; this.metricsContext = metricsContext; this.eventListener = new JobEventListener(taskLocation, runningTask.getExecutionContext()); - sinkWriteCount = metricsContext.counter(SINK_WRITE_COUNT); - sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS); - sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES); - sinkWriteBytesPerSeconds = metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS); - if (sinkAction.getSink() instanceof MultiTableSink) { - List sinkTables = ((MultiTableSink) sinkAction.getSink()).getSinkTables(); - sinkTables.forEach( - tablePath -> - sinkWriteCountPerTable.put( - tablePath.getFullName(), - metricsContext.counter( - SINK_WRITE_COUNT + "#" + tablePath.getFullName()))); + List sinkTables = new ArrayList<>(); + boolean isMulti = sinkAction.getSink() instanceof MultiTableSink; + if (isMulti) { + sinkTables = ((MultiTableSink) sinkAction.getSink()).getSinkTables(); } + this.taskMetricsCalcContext = + new TaskMetricsCalcContext(metricsContext, PluginType.SINK, isMulti, sinkTables); } @Override @@ -267,26 +244,7 @@ public void received(Record record) { return; } writer.write((T) record.getData()); - sinkWriteCount.inc(); - sinkWriteQPS.markEvent(); - if (record.getData() instanceof SeaTunnelRow) { - long size = ((SeaTunnelRow) record.getData()).getBytesSize(); - sinkWriteBytes.inc(size); - sinkWriteBytesPerSeconds.markEvent(size); - String tableId = ((SeaTunnelRow) record.getData()).getTableId(); - if (StringUtils.isNotBlank(tableId)) { - String tableName = TablePath.of(tableId).getFullName(); - Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName); - if (Objects.nonNull(sinkTableCounter)) { - sinkTableCounter.inc(); - } else { - Counter counter = - metricsContext.counter(SINK_WRITE_COUNT + "#" + tableName); - counter.inc(); - sinkWriteCountPerTable.put(tableName, counter); - } - } - } + taskMetricsCalcContext.updateMetrics(record.getData()); } } catch (Exception e) { throw new RuntimeException(e);