Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] [Zeta] Optimize CoordinatorService ThreadPool Configuration to Prevent Potential OOM #8241

Merged
merged 13 commits into from
Dec 19, 2024
21 changes: 21 additions & 0 deletions docs/en/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,27 @@ seatunnel:

When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` configuration will become invalid and will be forcibly changed to `job-schedule-strategy: REJECT`, because this parameter is meaningless in dynamic slots.

### 4.7 Coordinator Service

CoordinatorService responsible for the process of generating each job from a LogicalDag to an ExecutionDag,
and then to a PhysicalDag. It ultimately creates the JobMaster for the job to handle scheduling, execution, and state monitoring.

**core-thread-num**

The corePoolSize of seatunnel coordinator job's executor cached thread pool

**max-thread-num**

The max job count can be executed at same time

Example

```yaml
coordinator-service:
core-thread-num: 30
max-thread-num: 1000
```

## 5. Configure The SeaTunnel Engine Network Service

All SeaTunnel Engine network-related configurations are in the `hazelcast.yaml` file.
Expand Down
22 changes: 22 additions & 0 deletions docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,28 @@ seatunnel:

当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。

### 4.7 Coordinator Service

CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 PhysicalDag 的生成流程, 并最终创建作业的 JobMaster 进行作业的调度执行和状态监控

**core-thread-num**

配置 CoordinatorService 线程池核心线程数量

**max-thread-num**

同时可执行的最大作业数量

Example

```yaml
coordinator-service:
core-thread-num: 30
max-thread-num: 1000
```



## 5. 配置 SeaTunnel Engine 网络服务

所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast.yaml` 文件中.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
Expand Down Expand Up @@ -57,6 +58,9 @@ public class EngineConfig {

private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue();

private CoordinatorServiceConfig coordinatorServiceConfig =
ServerConfigOptions.COORDINATOR_SERVICE.defaultValue();

private ConnectorJarStorageConfig connectorJarStorageConfig =
ServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG.defaultValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.engine.common.config.server.ConnectorJarHAStorageConfig;
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
Expand Down Expand Up @@ -106,6 +107,25 @@ private SlotServiceConfig parseSlotServiceConfig(Node slotServiceNode) {
return slotServiceConfig;
}

private CoordinatorServiceConfig parseCoordinatorServiceConfig(Node coordinatorServiceNode) {
CoordinatorServiceConfig coordinatorServiceConfig = new CoordinatorServiceConfig();
for (Node node : childElements(coordinatorServiceNode)) {
String name = cleanNodeName(node);
if (ServerConfigOptions.MAX_THREAD_NUM.key().equals(name)) {
coordinatorServiceConfig.setMaxThreadNum(
getIntegerValue(
ServerConfigOptions.MAX_THREAD_NUM.key(), getTextContent(node)));
} else if (ServerConfigOptions.CORE_THREAD_NUM.key().equals(name)) {
coordinatorServiceConfig.setCoreThreadNum(
getIntegerValue(
ServerConfigOptions.CORE_THREAD_NUM.key(), getTextContent(node)));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
}
return coordinatorServiceConfig;
}

private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
final EngineConfig engineConfig = config.getEngineConfig();
for (Node node : childElements(engineNode)) {
Expand Down Expand Up @@ -177,6 +197,8 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
ScheduleStrategy.valueOf(getTextContent(node).toUpperCase(Locale.ROOT)));
} else if (ServerConfigOptions.HTTP.key().equals(name)) {
engineConfig.setHttpConfig(parseHttpConfig(node));
} else if (ServerConfigOptions.COORDINATOR_SERVICE.key().equals(name)) {
engineConfig.setCoordinatorServiceConfig(parseCoordinatorServiceConfig(node));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.common.config.server;

import lombok.Data;

import java.io.Serializable;

import static com.hazelcast.internal.util.Preconditions.checkPositive;

@Data
public class CoordinatorServiceConfig implements Serializable {

private int coreThreadNum = ServerConfigOptions.CORE_THREAD_NUM.defaultValue();

private int maxThreadNum = ServerConfigOptions.MAX_THREAD_NUM.defaultValue();

public void setCoreThreadNum(int coreThreadNum) {
checkPositive(coreThreadNum, ServerConfigOptions.CORE_THREAD_NUM + " must be >= 0");
this.coreThreadNum = coreThreadNum;
}

public void setMaxThreadNum(int maxThreadNum) {
checkPositive(maxThreadNum, ServerConfigOptions.MAX_THREAD_NUM + " must be > 0");
this.maxThreadNum = maxThreadNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,25 @@ public class ServerConfigOptions {
.type(new TypeReference<Map<String, String>>() {})
.noDefaultValue()
.withDescription("The checkpoint storage instance configuration.");

public static final Option<Integer> CORE_THREAD_NUM =
Options.key("core-thread-num")
.intType()
.defaultValue(10)
.withDescription("The core thread num of coordinator service");

public static final Option<Integer> MAX_THREAD_NUM =
Options.key("max-thread-num")
.intType()
.defaultValue(Integer.MAX_VALUE)
.withDescription("The max thread num of coordinator service");

public static final Option<CoordinatorServiceConfig> COORDINATOR_SERVICE =
Options.key("coordinator-service")
.type(new TypeReference<CoordinatorServiceConfig>() {})
.defaultValue(new CoordinatorServiceConfig())
.withDescription("The coordinator service configuration.");

public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES =
Options.key("history-job-expire-minutes")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public void testSeaTunnelConfig() {
Assertions.assertTrue(config.getEngineConfig().getHttpConfig().isEnableDynamicPort());
Assertions.assertEquals(8080, config.getEngineConfig().getHttpConfig().getPort());
Assertions.assertEquals(200, config.getEngineConfig().getHttpConfig().getPortRange());
Assertions.assertEquals(
30, config.getEngineConfig().getCoordinatorServiceConfig().getCoreThreadNum());
Assertions.assertEquals(
1000, config.getEngineConfig().getCoordinatorServiceConfig().getMaxThreadNum());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ seatunnel:
slot-service:
dynamic-slot: false
slot-num: 5
coordinator-service:
core-thread-num: 30
max-thread-num: 1000
checkpoint:
interval: 6000
timeout: 7000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,12 @@ public CoordinatorService(
@NonNull SeaTunnelServer seaTunnelServer,
EngineConfig engineConfig) {
this.nodeEngine = nodeEngine;
this.engineConfig = engineConfig;
this.logger = nodeEngine.getLogger(getClass());
this.executorService =
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
engineConfig.getCoordinatorServiceConfig().getCoreThreadNum(),
engineConfig.getCoordinatorServiceConfig().getMaxThreadNum(),
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Expand All @@ -212,7 +213,6 @@ public CoordinatorService(
.build(),
new ThreadPoolStatus.RejectionCountingHandler());
this.seaTunnelServer = seaTunnelServer;
this.engineConfig = engineConfig;
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
masterActiveListener.scheduleAtFixedRate(
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
Expand Down
Loading