Skip to content

Commit

Permalink
feat: put returns failure if the server memory usage exceeds the spec…
Browse files Browse the repository at this point in the history
…ified ratio (#3631)
  • Loading branch information
dl239 authored Jan 15, 2024
1 parent bd9d142 commit 7b88a80
Show file tree
Hide file tree
Showing 29 changed files with 296 additions and 69 deletions.
1 change: 1 addition & 0 deletions docs/en/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The following table introduces the parameters of `LOAD DATA INFILE`.
- As metioned in the above table, online execution mode only supports append input mode.
- When `deep_copy=false`, OpenMLDB doesn't support to modify the data in the soft link. Therefore, if the current offline data comes from a soft link, `append` import is no longer supported. Moreover, if current connection is soft copy, using the hard copy with `overwrite` will not delete the data of the soft connection.
- If the `insert_memory_usage_limit` session variable is set, a failure will be returned if the server memory usage exceeds the set value during online import
```

Expand Down
1 change: 1 addition & 0 deletions docs/en/reference/sql/ddl/SET_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The following format is also equivalent.
| @@session.sync_job|@@sync_job | When the value is `true`, the offline command will be executed synchronously, waiting for the final result of the execution.<br />When the value is `false`, the offline command returns immediately. If you need to check the execution, please use `SHOW JOB` command. | `true`, <br /> `false` | `false` |
| @@session.sync_timeout|@@sync_timeout | When `sync_job=true`, you can configure the waiting time for synchronization commands. The timeout will return immediately. After the timeout returns, you can still view the command execution through `SHOW JOB`. | Int | 20000 |
| @@session.spark_config|@@spark_config | Set the Spark configuration for offline jobs, configure like 'spark.executor.memory=2g;spark.executor.cores=2'. Notice that the priority of this Spark configuration is higer than TaskManager Spark configuration but lower than CLI Spark configuration file. | String | "" |
| @@session.insert_memory_usage_limit |@@insert_memory_usage_limit | Set server memory usage limit when inserting or importing data. If the server memory usage exceeds the set value, the insertion will fail. The value range is 0-100. 0 means unlimited | Int | "0" |

## Example

Expand Down
1 change: 1 addition & 0 deletions docs/zh/openmldb_sql/ddl/SET_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sessionVariableName ::= '@@'Identifier | '@@session.'Identifier | '@@global.'Ide
| @@session.sync_job|@@sync_job | 当该变量值为 `true`,离线的命令将变为同步,等待执行的最终结果。<br />当该变量值为 `false`,离线的命令即时返回,若要查看命令的执行情况,请使用`SHOW JOB`| "true" \| "false" | "false" |
| @@session.job_timeout|@@job_timeout | 可配置离线异步命令或离线管理命令的等待时间(以*毫秒*为单位),将立即返回。离线异步命令返回后仍可通过`SHOW JOB`查看命令执行情况。 | Int | "20000" |
| @@session.spark_config|@@spark_config | 设置离线任务的 Spark 参数,配置项参考 'spark.executor.memory=2g;spark.executor.cores=2'。注意此 Spark 配置优先级高于 TaskManager 默认 Spark 配置,低于命令行的 Spark 配置文件。 | String | "" |
| @@session.insert_memory_usage_limit|@@insert_memory_usage_limit | 设置数据插入或者数据导入时服务端内存使用率限制。取值范围为0-100。如果服务端内存使用率超过设置的值,就会插入失败。设置为0表示不限制 | Int | "0" |
## Example

### 设置和显示会话系统变量
Expand Down
2 changes: 2 additions & 0 deletions docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE t1;

在线导入只允许`mode='append'`,无法`overwrite``error_if_exists`

如果设置了 `insert_memory_usage_limit` session变量,服务端内存使用率超过设定的值就会返回失败。

## 离线导入规则

表的离线信息可通过`desc <table>`查看。我们将数据地址分为两类,离线地址是OpenMLDB的内部存储路径,硬拷贝将写入此地址,仅一个;软链接地址是软链接导入的地址列表。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class OpenmldbSource implements TableProvider, DataSourceRegister {
// single: insert when read one row
// batch: insert when commit(after read a whole partition)
private String writerType = "single";
private int insertMemoryUsageLimit = 0;

@Override
public StructType inferSchema(CaseInsensitiveStringMap options) {
Expand Down Expand Up @@ -71,12 +72,15 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
writerType = options.get("writerType");
}

if (options.containsKey("insert_memory_usage_limit")) {
insertMemoryUsageLimit = Integer.parseInt(options.get("insert_memory_usage_limit"));
}
return null;
}

@Override
public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
return new OpenmldbTable(dbName, tableName, option, writerType);
return new OpenmldbTable(dbName, tableName, option, writerType, insertMemoryUsageLimit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ public class OpenmldbTable implements SupportsWrite, SupportsRead {
private final String tableName;
private final SdkOption option;
private final String writerType;
private final int insertMemoryUsageLimit;
private SqlExecutor executor = null;

private Set<TableCapability> capabilities;

public OpenmldbTable(String dbName, String tableName, SdkOption option, String writerType) {
public OpenmldbTable(String dbName, String tableName, SdkOption option, String writerType, int insertMemoryUsageLimit) {
this.dbName = dbName;
this.tableName = tableName;
this.option = option;
this.writerType = writerType;
this.insertMemoryUsageLimit = insertMemoryUsageLimit;
try {
this.executor = new SqlClusterExecutor(option);
// no need to check table exists, schema() will check it later
Expand All @@ -69,7 +71,7 @@ public OpenmldbTable(String dbName, String tableName, SdkOption option, String w

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
OpenmldbWriteConfig config = new OpenmldbWriteConfig(dbName, tableName, option, writerType);
OpenmldbWriteConfig config = new OpenmldbWriteConfig(dbName, tableName, option, writerType, insertMemoryUsageLimit);
return new OpenmldbWriteBuilder(config, info);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
executor.executeSQL(dbName, "SET @@insert_memory_usage_limit=" + config.insertMemoryUsageLimit);

Schema schema = executor.getTableSchema(dbName, tableName);
// create insert placeholder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public OpenmldbDataWriter(OpenmldbWriteConfig config, int partitionId, long task
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
executor.executeSQL(dbName, "SET @@insert_memory_usage_limit=" + config.insertMemoryUsageLimit);

Schema schema = executor.getTableSchema(dbName, tableName);
// create insert placeholder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
// Must serializable
public class OpenmldbWriteConfig implements Serializable {
public final String dbName, tableName, zkCluster, zkPath, writerType;
public final int insertMemoryUsageLimit;

public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType) {
public OpenmldbWriteConfig(String dbName, String tableName, SdkOption option, String writerType, int insertMemoryUsageLimit) {
this.dbName = dbName;
this.tableName = tableName;
this.zkCluster = option.getZkCluster();
this.zkPath = option.getZkPath();
this.writerType = writerType;
this.insertMemoryUsageLimit = insertMemoryUsageLimit;
// TODO(hw): other configs in SdkOption
}
}
8 changes: 3 additions & 5 deletions src/apiserver/api_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
#include "absl/status/status.h"
#include "apiserver/interface_provider.h"
#include "apiserver/json_helper.h"
#include "rapidjson/document.h" // raw rapidjson 1.1.0, not in butil
#include "bvar/bvar.h"
#include "bvar/multi_dimension.h" // latency recorder
#include "proto/api_server.pb.h"
#include "rapidjson/document.h" // raw rapidjson 1.1.0, not in butil
#include "sdk/sql_cluster_router.h"
#include "sdk/sql_request_row.h"

#include "absl/status/status.h"
#include "bvar/bvar.h"
#include "bvar/multi_dimension.h" // latency recorder

namespace openmldb {
namespace apiserver {

Expand Down
1 change: 1 addition & 0 deletions src/base/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ enum ReturnCode {
kInvalidArgs = 161,
kCheckIndexFailed = 162,
kCatalogUpdateFailed = 163,
kExceedPutMemoryLimit = 164,
kNameserverIsNotLeader = 300,
kAutoFailoverIsEnabled = 301,
kEndpointIsNotExist = 302,
Expand Down
107 changes: 107 additions & 0 deletions src/base/sys_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef SRC_BASE_SYS_INFO_H_
#define SRC_BASE_SYS_INFO_H_

#include "absl/strings/ascii.h"
#include "absl/strings/match.h"
#include "absl/strings/numbers.h"
#include "absl/strings/string_view.h"
#include "base/status.h"

namespace openmldb::base {

constexpr const char* MEM_TOTAL = "MemTotal";
constexpr const char* MEM_BUFFERS = "Buffers";
constexpr const char* MEM_CACHED = "Cached";
constexpr const char* MEM_FREE = "MemFree";
constexpr const char* SRECLAIMABLE = "SReclaimable";

/* We calculate MemAvailable as follows
* MemAvailable = MemFree + Buffers + Cached + SReclaimable
* refer https://www.kernel.org/doc/Documentation/filesystems/proc.txt
* */

struct SysInfo {
uint64_t mem_total = 0; // unit is kB
uint64_t mem_used = 0; // unit is kB
uint64_t mem_free = 0; // unit is kB
uint64_t mem_buffers = 0; // unit is kB
uint64_t mem_cached = 0; // unit is kB
};

base::Status GetSysMem(SysInfo* info) {
#if defined(__linux__)
FILE *fd = fopen("/proc/meminfo", "r");
if (fd == nullptr) {
return {ReturnCode::kError, "fail to open meminfo file"};
}
char line[256];
auto parse = [](absl::string_view str, absl::string_view key, uint64_t* val) -> base::Status {
str.remove_prefix(key.size() + 1);
str.remove_suffix(2);
str = absl::StripAsciiWhitespace(str);
if (!absl::SimpleAtoi(str, val)) {
return {ReturnCode::kError, absl::StrCat("fail to parse ", key)};
}
return {};
};
int parse_cnt = 0;
uint64_t s_reclaimable = 0;
while (fgets(line, sizeof(line), fd)) {
absl::string_view str_view(line);
str_view = absl::StripAsciiWhitespace(str_view);
if (absl::StartsWith(str_view, MEM_TOTAL)) {
if (auto status = parse(str_view, MEM_TOTAL, &info->mem_total); !status.OK()) {
return status;
}
parse_cnt++;
} else if (absl::StartsWith(str_view, MEM_BUFFERS)) {
if (auto status = parse(str_view, MEM_BUFFERS, &info->mem_buffers); !status.OK()) {
return status;
}
parse_cnt++;
} else if (absl::StartsWith(str_view, MEM_CACHED)) {
if (auto status = parse(str_view, MEM_CACHED, &info->mem_cached); !status.OK()) {
return status;
}
parse_cnt++;
} else if (absl::StartsWith(str_view, MEM_FREE)) {
if (auto status = parse(str_view, MEM_FREE, &info->mem_free); !status.OK()) {
return status;
}
parse_cnt++;
} else if (absl::StartsWith(str_view, SRECLAIMABLE)) {
if (auto status = parse(str_view, SRECLAIMABLE, &s_reclaimable); !status.OK()) {
return status;
}
parse_cnt++;
}
}
if (parse_cnt != 5) {
return {ReturnCode::kError, "fail to parse meminfo"};
}
info->mem_cached += s_reclaimable;
info->mem_used = info->mem_total - info->mem_buffers - info->mem_cached - info->mem_free;
fclose(fd);
#endif
return {};
}

} // namespace openmldb::base

#endif // SRC_BASE_SYS_INFO_H_
50 changes: 50 additions & 0 deletions src/base/sys_info_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "gtest/gtest.h"
#include "base/sys_info.h"

namespace openmldb {
namespace base {

class SystemInfoTest : public ::testing::Test {
public:
SystemInfoTest() {}
~SystemInfoTest() {}
};

TEST_F(SystemInfoTest, GetMemory) {
base::SysInfo info;
auto status = base::GetSysMem(&info);
ASSERT_TRUE(status.OK());
ASSERT_GT(info.mem_total, 0);
ASSERT_GT(info.mem_used, 0);
ASSERT_GT(info.mem_free, 0);
ASSERT_EQ(info.mem_total, info.mem_used + info.mem_buffers + info.mem_free + info.mem_cached);
/*printf("total:%lu\n", info.mem_total);
printf("used:%lu\n", info.mem_used);
printf("free:%lu\n", info.mem_free);
printf("buffers:%lu\n", info.mem_buffers);
printf("cached:%lu\n", info.mem_cached);*/
}

} // namespace base
} // namespace openmldb

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
43 changes: 24 additions & 19 deletions src/client/tablet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,36 +201,43 @@ bool TabletClient::UpdateTableMetaForAddField(uint32_t tid, const std::vector<op
return false;
}

bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value,
const std::vector<std::pair<std::string, uint32_t>>& dimensions) {
base::Status TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const std::string& value,
const std::vector<std::pair<std::string, uint32_t>>& dimensions,
int memory_usage_limit) {
::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension> pb_dimensions;
for (size_t i = 0; i < dimensions.size(); i++) {
::openmldb::api::Dimension* d = pb_dimensions.Add();
d->set_key(dimensions[i].first);
d->set_idx(dimensions[i].second);
}
return Put(tid, pid, time, base::Slice(value), &pb_dimensions);
return Put(tid, pid, time, base::Slice(value), &pb_dimensions, memory_usage_limit);
}

bool TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value,
::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions) {
base::Status TabletClient::Put(uint32_t tid, uint32_t pid, uint64_t time, const base::Slice& value,
::google::protobuf::RepeatedPtrField<::openmldb::api::Dimension>* dimensions,
int memory_usage_limit) {
::openmldb::api::PutRequest request;
if (memory_usage_limit < 0 || memory_usage_limit > 100) {
return {base::ReturnCode::kError, absl::StrCat("invalid memory_usage_limit ", memory_usage_limit)};
} else if (memory_usage_limit > 0) {
request.set_memory_limit(memory_usage_limit);
}
request.set_time(time);
request.set_value(value.data(), value.size());
request.set_tid(tid);
request.set_pid(pid);
request.mutable_dimensions()->Swap(dimensions);
::openmldb::api::PutResponse response;
bool ok =
client_.SendRequest(&::openmldb::api::TabletServer_Stub::Put, &request, &response, FLAGS_request_timeout_ms, 1);
if (ok && response.code() == 0) {
return true;
auto st = client_.SendRequestSt(&::openmldb::api::TabletServer_Stub::Put,
&request, &response, FLAGS_request_timeout_ms, 1);
if (!st.OK()) {
return st;
}
LOG(WARNING) << "fail to send write request for " << response.msg() << " and error code " << response.code();
return false;
return {response.code(), response.msg()};
}

bool TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time, const std::string& value) {
base::Status TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64_t time,
const std::string& value) {
::openmldb::api::PutRequest request;
auto dim = request.add_dimensions();
dim->set_key(pk);
Expand All @@ -240,14 +247,12 @@ bool TabletClient::Put(uint32_t tid, uint32_t pid, const std::string& pk, uint64
request.set_tid(tid);
request.set_pid(pid);
::openmldb::api::PutResponse response;

bool ok =
client_.SendRequest(&::openmldb::api::TabletServer_Stub::Put, &request, &response, FLAGS_request_timeout_ms, 1);
if (ok && response.code() == 0) {
return true;
auto st = client_.SendRequestSt(&::openmldb::api::TabletServer_Stub::Put,
&request, &response, FLAGS_request_timeout_ms, 1);
if (!st.OK()) {
return st;
}
LOG(WARNING) << "fail to put for error " << response.msg();
return false;
return {response.code(), response.msg()};
}

bool TabletClient::MakeSnapshot(uint32_t tid, uint32_t pid, uint64_t offset, std::shared_ptr<TaskInfo> task_info) {
Expand Down
Loading

0 comments on commit 7b88a80

Please sign in to comment.