From 8f19aebb85e1673e7897b7c12d26943ff15c6811 Mon Sep 17 00:00:00 2001 From: dl239 Date: Thu, 22 Feb 2024 16:06:46 +0800 Subject: [PATCH] feat: delete data from all index (#3693) --- docs/en/openmldb_sql/dml/DELETE_STATEMENT.md | 1 - docs/en/quickstart/function_boundary.md | 57 --- docs/zh/openmldb_sql/dml/DELETE_STATEMENT.md | 1 - docs/zh/openmldb_sql/notice.md | 2 +- docs/zh/quickstart/function_boundary.md | 57 --- docs/zh/quickstart/openmldb_quickstart.md | 2 - src/catalog/client_manager.cc | 79 ++++ src/catalog/client_manager.h | 82 +--- src/catalog/distribute_iterator.cc | 2 +- src/catalog/tablet_catalog.h | 3 + src/client/tablet_client.cc | 24 +- src/client/tablet_client.h | 5 +- src/cmd/sql_cmd_test.cc | 122 +++++- src/proto/tablet.proto | 1 + src/sdk/node_adapter.cc | 9 +- src/sdk/node_adapter.h | 12 +- src/sdk/node_adapter_test.cc | 44 +- src/sdk/option.h | 41 ++ src/sdk/sql_cluster_router.cc | 55 ++- src/sdk/sql_cluster_router.h | 2 +- src/sdk/sql_cluster_test.cc | 41 -- src/storage/disk_table.h | 7 +- src/storage/mem_table.cc | 10 +- src/storage/mem_table.h | 5 +- src/storage/schema.h | 12 +- src/storage/segment.cc | 80 ++-- src/storage/segment.h | 6 +- src/tablet/tablet_impl.cc | 406 ++++++++++++------- src/tablet/tablet_impl.h | 9 + src/tablet/tablet_impl_test.cc | 2 + 30 files changed, 680 insertions(+), 499 deletions(-) create mode 100644 src/sdk/option.h diff --git a/docs/en/openmldb_sql/dml/DELETE_STATEMENT.md b/docs/en/openmldb_sql/dml/DELETE_STATEMENT.md index 60914052bfd..08d8349aa36 100644 --- a/docs/en/openmldb_sql/dml/DELETE_STATEMENT.md +++ b/docs/en/openmldb_sql/dml/DELETE_STATEMENT.md @@ -11,7 +11,6 @@ TableName ::= ``` **Description** -- `DELETE` statement will delete data fulfilling specific requirements in online table, not all data from the index. Only index related to where condition will be deleted. For more examples please check [function_boundary](../../quickstart/function_boundary.md#delete). - The filter columns specified by `WHERE` must be an index column. if it is a key column, only `=` can be used. ## Examples diff --git a/docs/en/quickstart/function_boundary.md b/docs/en/quickstart/function_boundary.md index 9c2c0b7ae14..b9d55864874 100644 --- a/docs/en/quickstart/function_boundary.md +++ b/docs/en/quickstart/function_boundary.md @@ -70,63 +70,6 @@ It is recommended to use HDFS files as source data. This approach allows for suc - In local mode, TaskManager can successfully import source data only if the source data is placed on the same host as the TaskManager process. - When TaskManager is in Yarn mode (both client and cluster), a file path cannot be used as the source data address because it is not known on which host the container is running. -### DELETE - -In tables with multiple indexes in the online storage, a `DELETE` operation may not delete corresponding data in all indexes. Consequently, there may be situations where data has been deleted, but the deleted data can still be found. - -For example: - -```SQL -create database db; -use db; -create table t1(c1 int, c2 int,index(key=c1),index(key=c2)); -desc t1; -set @@execute_mode='online'; -insert into t1 values (1,1),(2,2); -delete from t1 where c2=2; -select * from t1; -select * from t1 where c2=2; -``` - -The results are as follows: - -```Plain - --- ------- ------ ------ --------- - Field Type Null Default - --- ------- ------ ------ --------- - 1 c1 Int YES - 2 c2 Int YES - --- ------- ------ ------ --------- - --- -------------------- ------ ---- ------ --------------- - name keys ts ttl ttl_type - --- -------------------- ------ ---- ------ --------------- - 1 INDEX_0_1668504212 c1 - 0min kAbsoluteTime - 2 INDEX_1_1668504212 c2 - 0min kAbsoluteTime - --- -------------------- ------ ---- ------ --------------- - -------------- - storage_mode - -------------- - Memory - -------------- - ---- ---- - c1 c2 - ---- ---- - 1 1 - 2 2 - ---- ---- - -2 rows in set - ---- ---- - c1 c2 - ---- ---- - -0 rows in set -``` - -Explanation: - -Table `t1` has multiple indexes (which may be automatically created during `DEPLOY`). If you run `delete from t1 where c2=2`, it only deletes data in the second index, while the data in the first index remains unaffected. Therefore, if you subsequently run `select * from t1` and it uses the first index, there are two pieces of data that haven't been deleted. `select * from t1 where c2=2` uses the second index, and the result is empty, with data being successfully deleted. - ## DQL Boundary The supported query modes (i.e. `SELECT` statements) vary depending on the execution mode: diff --git a/docs/zh/openmldb_sql/dml/DELETE_STATEMENT.md b/docs/zh/openmldb_sql/dml/DELETE_STATEMENT.md index 716af9351fd..729f32b53a1 100644 --- a/docs/zh/openmldb_sql/dml/DELETE_STATEMENT.md +++ b/docs/zh/openmldb_sql/dml/DELETE_STATEMENT.md @@ -12,7 +12,6 @@ TableName ::= **说明** -- `DELETE` 语句删除在线表满足指定条件的数据,删除并不是所有索引中满足条件的数据都被删除,只会删除与where condition相关的索引,示例见[功能边界](../../quickstart/function_boundary.md#delete)。 - `WHERE` 指定的筛选列必须是索引列。如果是key列只能用等于 ## Examples diff --git a/docs/zh/openmldb_sql/notice.md b/docs/zh/openmldb_sql/notice.md index c9705ca3a1f..43629fd0b3e 100644 --- a/docs/zh/openmldb_sql/notice.md +++ b/docs/zh/openmldb_sql/notice.md @@ -7,7 +7,7 @@ | CREATE TABLE | 1. 在建表语句中如果没有指定索引,默认会自动创建一个`absolute 0`的索引。这个索引下的数据永不过期,可能会占用大量内存
2. 磁盘表`absandlat`和`absorlat`类型没有过期删除 | DROP TABLE | 1. 删除表默认是异步操作,执行完成后,异步删除表中的数据
2. 如果有分片在做snapshot, 会删除失败。可能存在部分分片删除部分没有删除的情况
3. 删除时默认会把数据目录放到recycle目录下。tablet的配置文件中`recycle_bin_enabled`参数可以配置是否要放到recycle, 默认是开启的
4. 由于内存碎片问题,释放的内存不一定完全释放给操作系统 | INSERT | 如果返回失败,可能有一部分数据已经插入进去 -| DELETE | 1. 删除的数据不会立马从内存中物理删除,需要等一个过期删除时间间隔(即参数 `gc_interval`)
2. 如果设置了长窗口,不会更新预聚合表里的数据 +| DELETE | 删除的数据不会立马从内存中物理删除,需要等一个过期删除时间间隔(即参数 `gc_interval`) | CREATE INDEX | 1. 创建索引是一个异步操作,如果表里有数据需要等一段时间 `desc` 命令才能显示出来
2. 在创建索引的过程中如果有写操作,那么可能会有部分新写入的数据在新加的索引上查询不出来
3. 磁盘表不支持创建索引 | DROP INDEX | 1. 删除一个索引之后,如果要再重新创建相同的索引需要等两个过期删除时间间隔(及参数 `gc_interval`)
2. 执行该命令后,内存中的索引并没有被真正的马上删除,需要等两个过期删除时间间隔才会在内存中真正被执行删除动作
3. 磁盘表不支持删除索引 | DEPLOY | 1. DEPLOY 命令可能会修改相关表的TTL,执行DEPLOY前导入的数据可能在新TTL生效前被淘汰,新的TTL生效时间为2个`gc_interval`
2. 在deployment关联的表中,如果有磁盘表需要添加索引,那么部署会失败,可能有部分索引已经添加成功 diff --git a/docs/zh/quickstart/function_boundary.md b/docs/zh/quickstart/function_boundary.md index 30fa4285a59..ea0d4ae9be8 100644 --- a/docs/zh/quickstart/function_boundary.md +++ b/docs/zh/quickstart/function_boundary.md @@ -82,63 +82,6 @@ spark.default.conf=spark.port.maxRetries=32;foo=bar 单个任务最大的并发数限制为`spark.executor.instances`*`spark.executor.cores`,请调整这两个配置。当spark.master=local时,调整driver的,而不是executor的。 -### DELETE - -在线存储的表有多索引,`DELETE` 可能无法删除所有索引中的对应数据,所以,可能出现删除了数据,却能查出已删除数据的情况。 - -举例: - -```SQL -create database db; -use db; -create table t1(c1 int, c2 int,index(key=c1),index(key=c2)); -desc t1; -set @@execute_mode='online'; -insert into t1 values (1,1),(2,2); -delete from t1 where c2=2; -select * from t1; -select * from t1 where c2=2; -``` - -结果如下: - -```Plain - --- ------- ------ ------ --------- - Field Type Null Default - --- ------- ------ ------ --------- - 1 c1 Int YES - 2 c2 Int YES - --- ------- ------ ------ --------- - --- -------------------- ------ ---- ------ --------------- - name keys ts ttl ttl_type - --- -------------------- ------ ---- ------ --------------- - 1 INDEX_0_1668504212 c1 - 0min kAbsoluteTime - 2 INDEX_1_1668504212 c2 - 0min kAbsoluteTime - --- -------------------- ------ ---- ------ --------------- - -------------- - storage_mode - -------------- - Memory - -------------- - ---- ---- - c1 c2 - ---- ---- - 1 1 - 2 2 - ---- ---- - -2 rows in set - ---- ---- - c1 c2 - ---- ---- - -0 rows in set -``` - -说明: - -表 `t1` 有多个索引(`DEPLOY` 也可能自动创建出多索引),`delete from t1 where c2=2` 实际只删除了第二个 index 的数据,第一个 index 数据没有被影响。这是因为delete的where condition只与第二个index相关,第一个index中没有任何该condition相关的key或ts。而 `select * from t1` 使用第一个索引,并非第二个,结果就会有两条数据,直观感受为delete失败了;`select * from t1 where c2=2` 使用第二个索引,结果为空,证明数据在该索引下已被删除。 - ## DQL 边界 根据执行模式的不同,支持的查询模式(即 `SELECT` 语句)也有所不同: diff --git a/docs/zh/quickstart/openmldb_quickstart.md b/docs/zh/quickstart/openmldb_quickstart.md index 77b1c1e29c1..12ec5724f16 100644 --- a/docs/zh/quickstart/openmldb_quickstart.md +++ b/docs/zh/quickstart/openmldb_quickstart.md @@ -150,8 +150,6 @@ SET @@execute_mode='online'; SELECT * FROM demo_table1 LIMIT 10; ``` -注意,目前要求成功完成 SQL 上线部署后,才能导入在线数据;如果先导入在线数据,会导致部署出错。 - ```{note} 本篇教程在数据导入以后,略过了实时数据接入的步骤。在实际场景中,由于现实时间的推移,需要将最新的实时数据更新到在线数据库。具体可以通过 OpenMLDB SDK 或者在线数据源 connector 实现(如 Kafka、Pulsar 等)。 ``` diff --git a/src/catalog/client_manager.cc b/src/catalog/client_manager.cc index d85de6c6c95..2930f17925e 100644 --- a/src/catalog/client_manager.cc +++ b/src/catalog/client_manager.cc @@ -324,6 +324,25 @@ std::shared_ptr<::hybridse::vm::TableHandler> TabletAccessor::SubQuery(uint32_t } return async_table_handler; } + +void TabletsAccessor::AddTabletAccessor(std::shared_ptr accessor) { + if (!accessor) { + LOG(WARNING) << "Fail to add null tablet accessor"; + return; + } + auto iter = name_idx_map_.find(accessor->GetName()); + if (iter == name_idx_map_.cend()) { + accessors_.push_back(accessor); + name_idx_map_.insert(std::make_pair(accessor->GetName(), accessors_.size() - 1)); + posinfos_.push_back(std::vector({rows_cnt_})); + assign_accessor_idxs_.push_back(accessors_.size() - 1); + } else { + posinfos_[iter->second].push_back(rows_cnt_); + assign_accessor_idxs_.push_back(iter->second); + } + rows_cnt_++; +} + std::shared_ptr TabletsAccessor::SubQuery(uint32_t task_id, const std::string& db, const std::string& sql, const hybridse::codec::Row& row, @@ -331,6 +350,7 @@ std::shared_ptr TabletsAccessor::SubQuery(uint32_t tas return std::make_shared<::hybridse::vm::ErrorRowHandler>(::hybridse::common::kRpcError, "TabletsAccessor Unsupport SubQuery with request"); } + std::shared_ptr TabletsAccessor::SubQuery(uint32_t task_id, const std::string& db, const std::string& sql, const std::set& common_column_indices, @@ -350,6 +370,7 @@ std::shared_ptr TabletsAccessor::SubQuery(uint32_t t } return tables_handler; } + PartitionClientManager::PartitionClientManager(uint32_t pid, const std::shared_ptr& leader, const std::vector>& followers) : pid_(pid), leader_(leader), followers_(followers), rand_(0xdeadbeef) {} @@ -406,6 +427,29 @@ TableClientManager::TableClientManager(const ::openmldb::storage::TableSt& table } } +void TableClientManager::Show() const { + DLOG(INFO) << "show client manager "; + for (size_t id = 0; id < partition_managers_.size(); id++) { + auto pmg = std::atomic_load_explicit(&partition_managers_[id], std::memory_order_relaxed); + if (pmg) { + if (pmg->GetLeader()) { + DLOG(INFO) << "partition managers (pid, leader) " << id << ", " << pmg->GetLeader()->GetName(); + } else { + DLOG(INFO) << "partition managers (pid, leader) " << id << ", null leader"; + } + } else { + DLOG(INFO) << "partition managers (pid, leader) " << id << ", null mamanger"; + } + } +} + +std::shared_ptr TableClientManager::GetPartitionClientManager(uint32_t pid) const { + if (pid < partition_managers_.size()) { + return std::atomic_load_explicit(&partition_managers_[pid], std::memory_order_relaxed); + } + return std::shared_ptr(); +} + bool TableClientManager::UpdatePartitionClientManager(const ::openmldb::storage::PartitionSt& partition, const ClientManager& client_manager) { uint32_t pid = partition.GetPid(); @@ -429,6 +473,41 @@ bool TableClientManager::UpdatePartitionClientManager(const ::openmldb::storage: return true; } +std::shared_ptr TableClientManager::GetTablet(uint32_t pid) const { + auto partition_manager = GetPartitionClientManager(pid); + if (partition_manager) { + return partition_manager->GetLeader(); + } + return std::shared_ptr(); +} + +std::vector> TableClientManager::GetTabletFollowers(uint32_t pid) const { + auto partition_manager = GetPartitionClientManager(pid); + if (partition_manager) { + return partition_manager->GetFollowers(); + } + return {}; +} + +std::shared_ptr TableClientManager::GetTablet(std::vector pids) const { + auto tablets_accessor = std::make_shared(); + for (size_t idx = 0; idx < pids.size(); idx++) { + auto partition_manager = GetPartitionClientManager(pids[idx]); + if (partition_manager) { + auto leader = partition_manager->GetLeader(); + if (!leader) { + LOG(WARNING) << "fail to get TabletsAccessor, null tablet for pid " << pids[idx]; + return std::shared_ptr(); + } + tablets_accessor->AddTabletAccessor(partition_manager->GetLeader()); + } else { + LOG(WARNING) << "fail to get tablet: pid " << pids[idx] << " not exist"; + return std::shared_ptr(); + } + } + return tablets_accessor; +} + std::shared_ptr ClientManager::GetTablet(const std::string& name) const { std::lock_guard<::openmldb::base::SpinMutex> lock(mu_); auto it = clients_.find(name); diff --git a/src/catalog/client_manager.h b/src/catalog/client_manager.h index 10a9e7d60bf..7e913e68ce5 100644 --- a/src/catalog/client_manager.h +++ b/src/catalog/client_manager.h @@ -56,6 +56,7 @@ class TabletRowHandler : public ::hybridse::vm::RowHandler { ::hybridse::codec::Row row_; openmldb::RpcCallback* callback_; }; + class AsyncTableHandler : public ::hybridse::vm::MemTableHandler { public: explicit AsyncTableHandler(openmldb::RpcCallback* callback, @@ -91,6 +92,7 @@ class AsyncTableHandler : public ::hybridse::vm::MemTableHandler { openmldb::RpcCallback* callback_; bool request_is_common_; }; + class AsyncTablesHandler : public ::hybridse::vm::MemTableHandler { public: AsyncTablesHandler(); @@ -169,28 +171,14 @@ class TabletAccessor : public ::hybridse::vm::Tablet { std::string name_; std::shared_ptr<::openmldb::client::TabletClient> tablet_client_; }; + class TabletsAccessor : public ::hybridse::vm::Tablet { public: TabletsAccessor() : name_("TabletsAccessor"), rows_cnt_(0) {} ~TabletsAccessor() {} const std::string& GetName() const { return name_; } - void AddTabletAccessor(std::shared_ptr accessor) { - if (!accessor) { - LOG(WARNING) << "Fail to add null tablet accessor"; - return; - } - auto iter = name_idx_map_.find(accessor->GetName()); - if (iter == name_idx_map_.cend()) { - accessors_.push_back(accessor); - name_idx_map_.insert(std::make_pair(accessor->GetName(), accessors_.size() - 1)); - posinfos_.push_back(std::vector({rows_cnt_})); - assign_accessor_idxs_.push_back(accessors_.size() - 1); - } else { - posinfos_[iter->second].push_back(rows_cnt_); - assign_accessor_idxs_.push_back(iter->second); - } - rows_cnt_++; - } + void AddTabletAccessor(std::shared_ptr accessor); + std::shared_ptr SubQuery(uint32_t task_id, const std::string& db, const std::string& sql, const hybridse::codec::Row& row, const bool is_procedure, const bool is_debug) override; @@ -209,6 +197,7 @@ class TabletsAccessor : public ::hybridse::vm::Tablet { std::vector> posinfos_; std::map name_idx_map_; }; + class PartitionClientManager { public: PartitionClientManager(uint32_t pid, const std::shared_ptr& leader, @@ -235,65 +224,18 @@ class TableClientManager { TableClientManager(const ::openmldb::storage::TableSt& table_st, const ClientManager& client_manager); - void Show() const { - DLOG(INFO) << "show client manager "; - for (size_t id = 0; id < partition_managers_.size(); id++) { - auto pmg = std::atomic_load_explicit(&partition_managers_[id], std::memory_order_relaxed); - if (pmg) { - if (pmg->GetLeader()) { - DLOG(INFO) << "partition managers (pid, leader) " << id << ", " << pmg->GetLeader()->GetName(); - } else { - DLOG(INFO) << "partition managers (pid, leader) " << id << ", null leader"; - } - } else { - DLOG(INFO) << "partition managers (pid, leader) " << id << ", null mamanger"; - } - } - } - std::shared_ptr GetPartitionClientManager(uint32_t pid) const { - if (pid < partition_managers_.size()) { - return std::atomic_load_explicit(&partition_managers_[pid], std::memory_order_relaxed); - } - return std::shared_ptr(); - } + void Show() const; + + std::shared_ptr GetPartitionClientManager(uint32_t pid) const; bool UpdatePartitionClientManager(const ::openmldb::storage::PartitionSt& partition, const ClientManager& client_manager); - std::shared_ptr GetTablet(uint32_t pid) const { - auto partition_manager = GetPartitionClientManager(pid); - if (partition_manager) { - return partition_manager->GetLeader(); - } - return std::shared_ptr(); - } + std::shared_ptr GetTablet(uint32_t pid) const; - std::vector> GetTabletFollowers(uint32_t pid) const { - auto partition_manager = GetPartitionClientManager(pid); - if (partition_manager) { - return partition_manager->GetFollowers(); - } - return {}; - } + std::vector> GetTabletFollowers(uint32_t pid) const; - std::shared_ptr GetTablet(std::vector pids) const { - std::shared_ptr tablets_accessor = std::shared_ptr(new TabletsAccessor()); - for (size_t idx = 0; idx < pids.size(); idx++) { - auto partition_manager = GetPartitionClientManager(pids[idx]); - if (partition_manager) { - auto leader = partition_manager->GetLeader(); - if (!leader) { - LOG(WARNING) << "fail to get TabletsAccessor, null tablet for pid " << pids[idx]; - return std::shared_ptr(); - } - tablets_accessor->AddTabletAccessor(partition_manager->GetLeader()); - } else { - LOG(WARNING) << "fail to get tablet: pid " << pids[idx] << " not exist"; - return std::shared_ptr(); - } - } - return tablets_accessor; - } + std::shared_ptr GetTablet(std::vector pids) const; private: std::vector> partition_managers_; diff --git a/src/catalog/distribute_iterator.cc b/src/catalog/distribute_iterator.cc index b82afbb81fd..032d3ec75f2 100644 --- a/src/catalog/distribute_iterator.cc +++ b/src/catalog/distribute_iterator.cc @@ -423,7 +423,7 @@ const ::hybridse::codec::Row& RemoteWindowIterator::GetValue() { memcpy(copyed_row_data, slice_row.data(), sz); auto shared_slice = ::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, sz); row_.Reset(shared_slice); - LOG(INFO) << "get value pk " << pk_ << " ts_key " << kv_it_->GetKey() << " ts " << ts_; + DLOG(INFO) << "get value pk " << pk_ << " ts_key " << kv_it_->GetKey() << " ts " << ts_; valid_value_ = true; return row_; } diff --git a/src/catalog/tablet_catalog.h b/src/catalog/tablet_catalog.h index 7d834147591..0a054a869db 100644 --- a/src/catalog/tablet_catalog.h +++ b/src/catalog/tablet_catalog.h @@ -175,6 +175,7 @@ class TabletTableHandler : public ::hybridse::vm::TableHandler, const std::vector &pks) override; inline uint32_t GetTid() { return table_st_.GetTid(); } + inline uint32_t GetPartitionNum() { return partition_num_; } void AddTable(std::shared_ptr<::openmldb::storage::Table> table); @@ -185,6 +186,8 @@ class TabletTableHandler : public ::hybridse::vm::TableHandler, bool Update(const ::openmldb::nameserver::TableInfo &meta, const ClientManager &client_manager, bool* index_updated); + std::shared_ptr GetTableClientManager() { return table_client_manager_; } + private: inline int32_t GetColumnIndex(const std::string &column) { auto it = types_.find(column); diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index f445cc1791c..f6e8406add2 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -815,28 +815,28 @@ bool TabletClient::Delete(uint32_t tid, uint32_t pid, const std::string& pk, con return true; } -base::Status TabletClient::Delete(uint32_t tid, uint32_t pid, const std::map& index_val, - const std::string& ts_name, const std::optional start_ts, const std::optional& end_ts) { +base::Status TabletClient::Delete(uint32_t tid, uint32_t pid, const sdk::DeleteOption& option, uint64_t timeout_ms) { ::openmldb::api::DeleteRequest request; ::openmldb::api::GeneralResponse response; request.set_tid(tid); request.set_pid(pid); - for (const auto& kv : index_val) { + if (option.idx.has_value()) { auto dimension = request.add_dimensions(); - dimension->set_idx(kv.first); - dimension->set_key(kv.second); + dimension->set_idx(option.idx.value()); + dimension->set_key(option.key); } - if (start_ts.has_value()) { - request.set_ts(start_ts.value()); + if (option.start_ts.has_value()) { + request.set_ts(option.start_ts.value()); } - if (end_ts.has_value()) { - request.set_end_ts(end_ts.value()); + if (option.end_ts.has_value()) { + request.set_end_ts(option.end_ts.value()); } - if (!ts_name.empty()) { - request.set_ts_name(ts_name); + if (!option.ts_name.empty()) { + request.set_ts_name(option.ts_name); } + request.set_enable_decode_value(option.enable_decode_value); bool ok = client_.SendRequest(&::openmldb::api::TabletServer_Stub::Delete, &request, &response, - FLAGS_request_timeout_ms, 1); + timeout_ms, 1); if (!ok || response.code() != 0) { return {base::ReturnCode::kError, response.msg()}; } diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index 19579f90c5c..632f75f3510 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -31,6 +31,7 @@ #include "codec/schema_codec.h" #include "proto/tablet.pb.h" #include "rpc/rpc_client.h" +#include "sdk/option.h" namespace openmldb { @@ -92,9 +93,7 @@ class TabletClient : public Client { bool Delete(uint32_t tid, uint32_t pid, const std::string& pk, const std::string& idx_name, std::string& msg); // NOLINT - base::Status Delete(uint32_t tid, uint32_t pid, const std::map& index_val, - const std::string& ts_name, const std::optional start_ts, - const std::optional& end_ts); + base::Status Delete(uint32_t tid, uint32_t pid, const sdk::DeleteOption& option, uint64_t timeout_ms); bool Count(uint32_t tid, uint32_t pid, const std::string& pk, const std::string& idx_name, bool filter_expired_data, uint64_t& value, std::string& msg); // NOLINT diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index d6e9e811fa2..b575053324d 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -1238,9 +1238,129 @@ TEST_P(DBSDKTest, DeletetSameColIndex) { auto res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); ASSERT_EQ(res->Size(), 100); - ProcessSQLs(sr, {absl::StrCat("delete from ", table_name, " where c1 = 'key2';")}); + sr->ExecuteSQL(absl::StrCat("delete from ", table_name, " where c1 = 'key2';"), &status); + ASSERT_TRUE(status.IsOK()) << status.msg; + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 90); + ProcessSQLs(sr, { + absl::StrCat("drop table ", table_name), + absl::StrCat("drop database ", db_name), + }); +} + +TEST_P(DBSDKTest, TestDelete) { + auto cli = GetParam(); + sr = cli->sr; + std::string name = "test" + GenRand(); + ::hybridse::sdk::Status status; + std::string ddl; + + std::string db = "db" + GenRand(); + ASSERT_TRUE(sr->CreateDB(db, &status)); + sr->ExecuteSQL(db, "set @@execute_mode = 'online';", &status); + sr->ExecuteSQL(db, "use " + db + " ;", &status); + ddl = absl::StrCat("create table ", name, + "(col1 string, col2 string, col3 string, col4 bigint, col5 bigint, col6 bigint, col7 string," + "index(key=col1, ts=col4), index(key=(col1, col2), ts=col4), index(key=col3, ts=col5));"); + ASSERT_TRUE(sr->ExecuteDDL(db, ddl, &status)) << "ddl: " << ddl; + ASSERT_TRUE(sr->RefreshCatalog()); + for (int i = 0; i < 10; i++) { + std::string key1 = absl::StrCat("key1_", i); + std::string key2 = absl::StrCat("key2_", i); + std::string key3 = absl::StrCat("key3_", i); + for (int j = 0; j < 10; j++) { + sr->ExecuteSQL(absl::StrCat("insert into ", name, + " values ('", key1, "', '", key2, "', '", key3, "', ", 100 + j, ",", 1000 + j, ", 1, 'v');"), + &status); + } + } + auto rs = sr->ExecuteSQL(db, "select * from " + name + ";", &status); + ASSERT_EQ(rs->Size(), 100); + rs = sr->ExecuteSQL(db, "delete from " + name + " where col1 = 'xxx' and col5 > 100;", &status); + ASSERT_FALSE(status.IsOK()); + rs = sr->ExecuteSQL(db, "delete from " + name + " where col1 = 'xxx' and col6 > 100;", &status); + ASSERT_FALSE(status.IsOK()); + rs = sr->ExecuteSQL(db, "delete from " + name + " where col1 = 'xxx' and col3 = 'aaa';", &status); + ASSERT_FALSE(status.IsOK()); + rs = sr->ExecuteSQL(db, "delete from " + name + " where col7 = 'xxx' and col3 = 'aaa';", &status); + ASSERT_FALSE(status.IsOK()); + sr->ExecuteSQL(db, "delete from " + name + " where col6 > 100;", &status); + ASSERT_FALSE(status.IsOK()); + rs = sr->ExecuteSQL(db, "delete from " + name + " where col1 = 'key1_1';", &status); + ASSERT_TRUE(status.IsOK()); + rs = sr->ExecuteSQL(db, "select * from " + name + " where col1 = 'key1_1';", &status); + ASSERT_EQ(rs->Size(), 0); + rs = sr->ExecuteSQL(db, "select * from " + name + " where col1 = 'key1_1' and col2 = 'key2_1';", &status); + ASSERT_EQ(rs->Size(), 0); + rs = sr->ExecuteSQL(db, "select * from " + name + " where col3 = 'key3_1';", &status); + ASSERT_EQ(rs->Size(), 0); + sr->ExecuteSQL(db, "delete from " + name + " where col4 > 105;", &status); + ASSERT_TRUE(status.IsOK()); + rs = sr->ExecuteSQL(db, "select * from " + name + " where col1 = 'key1_2';", &status); + ASSERT_EQ(rs->Size(), 6); + rs = sr->ExecuteSQL(db, "select * from " + name + " where col1 = 'key1_2' and col2 = 'key2_2';", &status); + ASSERT_EQ(rs->Size(), 6); + rs = sr->ExecuteSQL(db, "select * from " + name + " where col3 = 'key3_2';", &status); + ASSERT_EQ(rs->Size(), 6); + + ASSERT_TRUE(sr->ExecuteDDL(db, "drop table " + name + ";", &status)); + ASSERT_TRUE(sr->DropDB(db, &status)); +} + + +TEST_P(DBSDKTest, DeletetMulIndex) { + auto cli = GetParam(); + sr = cli->sr; + std::string db_name = "test2"; + std::string table_name = "test1"; + std::string ddl = + "create table test1 (c1 string, c2 string, c3 bigint, c4 bigint, " + "INDEX(KEY=c1, ts=c3), INDEX(KEY=c2, ts=c4));"; + ProcessSQLs(sr, { + "set @@execute_mode = 'online'", + absl::StrCat("create database ", db_name, ";"), + absl::StrCat("use ", db_name, ";"), + ddl, + }); + hybridse::sdk::Status status; + for (int i = 0; i < 10; i++) { + std::string key1 = absl::StrCat("key1_", i); + std::string key2 = absl::StrCat("key2_", i); + for (int j = 0; j < 10; j++) { + uint64_t ts = 1000 + j; + sr->ExecuteSQL(absl::StrCat("insert into ", table_name, + " values ('", key1, "', '", key2, "', ", ts, ",", ts, ");"), + &status); + } + } + + auto res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 100); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, " where c1 = \'key1_2\';"), &status); + ASSERT_EQ(res->Size(), 10); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, " where c2 = \'key2_2\';"), &status); + ASSERT_EQ(res->Size(), 10); + sr->ExecuteSQL(absl::StrCat("delete from ", table_name, " where c1 = 'key1_2' and c3 = 1001;"), &status); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, " where c1 = \'key1_2\';"), &status); + ASSERT_EQ(res->Size(), 9); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, " where c2 = \'key2_2\';"), &status); + ASSERT_EQ(res->Size(), 9); + sr->ExecuteSQL(absl::StrCat("delete from ", table_name, " where c1 = 'key1_2';"), &status); + ASSERT_TRUE(status.IsOK()) << status.msg; res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); ASSERT_EQ(res->Size(), 90); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, " where c1 = \'key1_2\';"), &status); + ASSERT_EQ(res->Size(), 0); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, " where c2 = \'key2_2\';"), &status); + ASSERT_EQ(res->Size(), 0); + sr->ExecuteSQL(absl::StrCat("delete from ", table_name, " where c3 >= 1005 ;"), &status); + ASSERT_TRUE(status.IsOK()) << status.msg; + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 45); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, " where c1 = \'key1_3\';"), &status); + ASSERT_EQ(res->Size(), 5); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, " where c2 = \'key2_3\';"), &status); + ASSERT_EQ(res->Size(), 5); ProcessSQLs(sr, { absl::StrCat("drop table ", table_name), absl::StrCat("drop database ", db_name), diff --git a/src/proto/tablet.proto b/src/proto/tablet.proto index a18714b2ae1..a1ae6e72d5a 100755 --- a/src/proto/tablet.proto +++ b/src/proto/tablet.proto @@ -212,6 +212,7 @@ message DeleteRequest { optional uint64 ts = 6; optional uint64 end_ts = 7; optional string ts_name = 8; + optional bool enable_decode_value = 9 [default = true]; } message ExecuteGcRequest { diff --git a/src/sdk/node_adapter.cc b/src/sdk/node_adapter.cc index 568de5bf05d..58a0b534b4e 100644 --- a/src/sdk/node_adapter.cc +++ b/src/sdk/node_adapter.cc @@ -133,9 +133,9 @@ hybridse::sdk::Status NodeAdapter::ExtractDeleteOption( } if (match_index_col > 0) { if (!option->ts_name.empty()) { - option->index_map.clear(); + option->idx.reset(); } - if (option->index_map.empty()) { + if (!option->idx.has_value()) { matched_column_key.CopyFrom(column_key); } else { if (column_key.col_name_size() != matched_column_key.col_name_size() || @@ -144,7 +144,8 @@ hybridse::sdk::Status NodeAdapter::ExtractDeleteOption( return {hybridse::common::StatusCode::kCmdError, "hit multiple indexs"}; } } - option->index_map.emplace(index_pos[column_key.index_name()], pk); + option->idx = index_pos[column_key.index_name()]; + option->key = pk; for (const auto& col : matched_column_key.col_name()) { hit_con_col.insert(col); } @@ -153,7 +154,7 @@ hybridse::sdk::Status NodeAdapter::ExtractDeleteOption( } } } - if (!option->ts_name.empty() && !option->index_map.empty() && option->ts_name != matched_column_key.ts_name()) { + if (!option->ts_name.empty() && option->idx.has_value() && option->ts_name != matched_column_key.ts_name()) { return {hybridse::common::StatusCode::kCmdError, "ts name mismatch"}; } for (const auto& con : condition_vec) { diff --git a/src/sdk/node_adapter.h b/src/sdk/node_adapter.h index fe39554ad51..319c7b1b1fe 100644 --- a/src/sdk/node_adapter.h +++ b/src/sdk/node_adapter.h @@ -27,22 +27,12 @@ #include "node/node_manager.h" #include "proto/name_server.pb.h" #include "proto/type.pb.h" +#include "sdk/option.h" #include "sdk/sql_delete_row.h" namespace openmldb { namespace sdk { -struct DeleteOption { - DeleteOption(const std::map& index, const std::string& name, - const std::optional& ts1, const std::optional& ts2) : - index_map(index), ts_name(name), start_ts(ts1), end_ts(ts2) {} - DeleteOption() = default; - std::map index_map; - std::string ts_name; - std::optional start_ts = std::nullopt; - std::optional end_ts = std::nullopt; -}; - class NodeAdapter { public: static bool TransformToTableDef(::hybridse::node::CreatePlanNode* create_node, diff --git a/src/sdk/node_adapter_test.cc b/src/sdk/node_adapter_test.cc index 70c35ff7d9c..be86d8a790f 100644 --- a/src/sdk/node_adapter_test.cc +++ b/src/sdk/node_adapter_test.cc @@ -124,11 +124,12 @@ static std::vector cases = { INSTANTIATE_TEST_SUITE_P(NodeAdapter, NodeAdapterTest, testing::ValuesIn(cases)); void CheckDeleteOption(const DeleteOption& option, const DeleteOption& expect_option) { - ASSERT_EQ(option.index_map.size(), expect_option.index_map.size()); - for (const auto& kv : option.index_map) { - auto iter = expect_option.index_map.find(kv.first); - ASSERT_TRUE(iter != expect_option.index_map.end()); - ASSERT_EQ(kv.second, iter->second); + if (option.idx.has_value()) { + ASSERT_TRUE(expect_option.idx.has_value()); + ASSERT_EQ(option.idx.value(), expect_option.idx.value()); + ASSERT_EQ(option.key, expect_option.key); + } else { + ASSERT_FALSE(expect_option.idx.has_value()); } ASSERT_EQ(expect_option.ts_name, option.ts_name); if (option.start_ts.has_value()) { @@ -143,6 +144,7 @@ void CheckDeleteOption(const DeleteOption& option, const DeleteOption& expect_op } else { ASSERT_FALSE(expect_option.end_ts.has_value()); } + ASSERT_EQ(option.enable_decode_value, expect_option.enable_decode_value); } struct DeleteOptionParm { @@ -165,51 +167,51 @@ TEST_P(DeleteOptionTest, TransformToTableInfo) { std::vector option_cases = { DeleteOptionParm({Condition("card", hybridse::node::FnOperator::kFnOpEq, "key1", type::DataType::kString)}, - DeleteOption({{0, "key1"}}, "", std::nullopt, std::nullopt)), + DeleteOption(0, "key1", "", std::nullopt, std::nullopt)), DeleteOptionParm({Condition("card", hybridse::node::FnOperator::kFnOpEq, "key1", type::DataType::kString), Condition("ts1", hybridse::node::FnOperator::kFnOpEq, "10", type::DataType::kBigInt)}, - DeleteOption({{0, "key1"}}, "ts1", 10, 9)), + DeleteOption(0, "key1", "ts1", 10, 9)), DeleteOptionParm({Condition("card", hybridse::node::FnOperator::kFnOpEq, "key1", type::DataType::kString), Condition("ts1", hybridse::node::FnOperator::kFnOpGe, "10", type::DataType::kBigInt)}, - DeleteOption({{0, "key1"}}, "ts1", std::nullopt, 9)), + DeleteOption(0, "key1", "ts1", std::nullopt, 9)), DeleteOptionParm({Condition("card", hybridse::node::FnOperator::kFnOpEq, "key1", type::DataType::kString), Condition("ts1", hybridse::node::FnOperator::kFnOpGt, "10", type::DataType::kBigInt)}, - DeleteOption({{0, "key1"}}, "ts1", std::nullopt, 10)), + DeleteOption(0, "key1", "ts1", std::nullopt, 10)), DeleteOptionParm({Condition("card", hybridse::node::FnOperator::kFnOpEq, "key1", type::DataType::kString), Condition("ts1", hybridse::node::FnOperator::kFnOpLt, "10", type::DataType::kBigInt)}, - DeleteOption({{0, "key1"}}, "ts1", 9, std::nullopt)), + DeleteOption(0, "key1", "ts1", 9, std::nullopt)), DeleteOptionParm({Condition("card", hybridse::node::FnOperator::kFnOpEq, "key1", type::DataType::kString), Condition("ts1", hybridse::node::FnOperator::kFnOpLe, "10", type::DataType::kBigInt)}, - DeleteOption({{0, "key1"}}, "ts1", 10, std::nullopt)), + DeleteOption(0, "key1", "ts1", 10, std::nullopt)), DeleteOptionParm({Condition("card", hybridse::node::FnOperator::kFnOpEq, "key1", type::DataType::kString), Condition("ts1", hybridse::node::FnOperator::kFnOpGe, "0", type::DataType::kBigInt)}, - DeleteOption({{0, "key1"}}, "ts1", std::nullopt, std::nullopt)), + DeleteOption(0, "key1", "ts1", std::nullopt, std::nullopt)), DeleteOptionParm({Condition("card", hybridse::node::FnOperator::kFnOpEq, "key1", type::DataType::kString), Condition("ts1", hybridse::node::FnOperator::kFnOpEq, "0", type::DataType::kBigInt)}, - DeleteOption({{0, "key1"}}, "ts1", 0, std::nullopt)), + DeleteOption(0, "key1", "ts1", 0, std::nullopt)), DeleteOptionParm({Condition("ts1", hybridse::node::FnOperator::kFnOpEq, "10", type::DataType::kBigInt)}, - DeleteOption({}, "ts1", 10, 9)), + DeleteOption(std::nullopt, "", "ts1", 10, 9)), DeleteOptionParm({Condition("ts1", hybridse::node::FnOperator::kFnOpGe, "10", type::DataType::kBigInt)}, - DeleteOption({}, "ts1", std::nullopt, 9)), + DeleteOption(std::nullopt, "", "ts1", std::nullopt, 9)), DeleteOptionParm({Condition("ts1", hybridse::node::FnOperator::kFnOpGe, "10", type::DataType::kBigInt), Condition("ts1", hybridse::node::FnOperator::kFnOpGe, "11", type::DataType::kBigInt)}, - DeleteOption({}, "ts1", std::nullopt, 10)), + DeleteOption(std::nullopt, "", "ts1", std::nullopt, 10)), DeleteOptionParm({Condition("ts1", hybridse::node::FnOperator::kFnOpGe, "10", type::DataType::kBigInt), Condition("ts1", hybridse::node::FnOperator::kFnOpGe, "11", type::DataType::kBigInt)}, - DeleteOption({}, "ts1", std::nullopt, 10)), + DeleteOption(std::nullopt, "", "ts1", std::nullopt, 10)), DeleteOptionParm({Condition("ts1", hybridse::node::FnOperator::kFnOpGe, "10", type::DataType::kBigInt), Condition("ts1", hybridse::node::FnOperator::kFnOpLt, "20", type::DataType::kBigInt)}, - DeleteOption({}, "ts1", 19, 9)), + DeleteOption(std::nullopt, "", "ts1", 19, 9)), DeleteOptionParm({Condition("ts1", hybridse::node::FnOperator::kFnOpGt, "10", type::DataType::kBigInt), Condition("ts1", hybridse::node::FnOperator::kFnOpLt, "20", type::DataType::kBigInt)}, - DeleteOption({}, "ts1", 19, 10)), + DeleteOption(std::nullopt, "", "ts1", 19, 10)), DeleteOptionParm({Condition("ts1", hybridse::node::FnOperator::kFnOpGt, "10", type::DataType::kBigInt), Condition("ts1", hybridse::node::FnOperator::kFnOpLe, "20", type::DataType::kBigInt)}, - DeleteOption({}, "ts1", 20, 10)), + DeleteOption(std::nullopt, "", "ts1", 20, 10)), DeleteOptionParm({Condition("card", hybridse::node::FnOperator::kFnOpEq, "key1", type::DataType::kString), Condition("ts1", hybridse::node::FnOperator::kFnOpGt, "10", type::DataType::kBigInt), Condition("ts1", hybridse::node::FnOperator::kFnOpLe, "20", type::DataType::kBigInt)}, - DeleteOption({{0, "key1"}}, "ts1", 20, 10)) + DeleteOption(0, "key1", "ts1", 20, 10)) }; INSTANTIATE_TEST_SUITE_P(NodeAdapter, DeleteOptionTest, testing::ValuesIn(option_cases)); diff --git a/src/sdk/option.h b/src/sdk/option.h new file mode 100644 index 00000000000..3acb4e30afa --- /dev/null +++ b/src/sdk/option.h @@ -0,0 +1,41 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SRC_SDK_OPTION_H_ +#define SRC_SDK_OPTION_H_ + +#include + +namespace openmldb { +namespace sdk { + +struct DeleteOption { + DeleteOption(std::optional idx_i, const std::string& key_i, const std::string& ts_name_i, + std::optional start_ts_i, std::optional end_ts_i) : + idx(idx_i), key(key_i), ts_name(ts_name_i), start_ts(start_ts_i), end_ts(end_ts_i) {} + DeleteOption() = default; + std::optional idx = std::nullopt; + std::string key; + std::string ts_name; + std::optional start_ts = std::nullopt; + std::optional end_ts = std::nullopt; + bool enable_decode_value = true; +}; + +} // namespace sdk +} // namespace openmldb + +#endif // SRC_SDK_OPTION_H_ diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 2d6bba57df1..3dc11369fea 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -3507,14 +3507,13 @@ hybridse::sdk::Status SQLClusterRouter::HandleDelete(const std::string& db, cons if (!status.IsOK()) { return status; } - status = SendDeleteRequst(table_info, &option); + status = SendDeleteRequst(table_info, option); if (status.IsOK() && db != nameserver::INTERNAL_DB) { status = { StatusCode::kOk, "DELETE is a dangerous operation. Once deleted, it is very difficult to recover. You may also note that:\n" "- The deleted data will not be released immediately from the main memory; " "it remains until after a garbage collection interval (gc_interval)\n" - "- Data in the pre-aggregation table will not be updated.\n" "Please refer to this link for more details: " + base::NOTICE_URL}; } @@ -3522,8 +3521,8 @@ hybridse::sdk::Status SQLClusterRouter::HandleDelete(const std::string& db, cons } hybridse::sdk::Status SQLClusterRouter::SendDeleteRequst( - const std::shared_ptr<::openmldb::nameserver::TableInfo>& table_info, const DeleteOption* option) { - if (option->index_map.empty()) { + const std::shared_ptr<::openmldb::nameserver::TableInfo>& table_info, const DeleteOption& option) { + if (!option.idx.has_value()) { std::vector> tablets; if (!cluster_sdk_->GetTablet(table_info->db(), table_info->name(), &tablets)) { return {StatusCode::kCmdError, "get tablet failed"}; @@ -3533,39 +3532,29 @@ hybridse::sdk::Status SQLClusterRouter::SendDeleteRequst( return {StatusCode::kCmdError, "cannot connect tablet"}; } } - for (size_t idx = 0; idx < tablets.size(); idx++) { - auto tablet_client = tablets.at(idx)->GetClient(); - if (auto status = tablet_client->Delete(table_info->tid(), idx, option->index_map, option->ts_name, - option->start_ts, option->end_ts); - !status.OK()) { - return {StatusCode::kCmdError, status.GetMsg()}; - } - } - } else { - std::map> pid_index_map; - for (const auto& kv : option->index_map) { - uint32_t pid = ::openmldb::base::hash64(kv.second) % table_info->table_partition_size(); - auto iter = pid_index_map.find(pid); - if (iter == pid_index_map.end()) { - iter = pid_index_map.emplace(pid, std::map()).first; - } - iter->second.emplace(kv.first, kv.second); - } - for (const auto& kv : pid_index_map) { - auto tablet = cluster_sdk_->GetTablet(table_info->db(), table_info->name(), kv.first); - if (!tablet) { - return {StatusCode::kCmdError, "cannot connect tablet"}; - } - auto tablet_client = tablet->GetClient(); + for (size_t pid = 0; pid < tablets.size(); pid++) { + auto tablet_client = tablets.at(pid)->GetClient(); if (!tablet_client) { return {StatusCode::kCmdError, "tablet client is null"}; } - auto ret = tablet_client->Delete(table_info->tid(), kv.first, kv.second, option->ts_name, option->start_ts, - option->end_ts); + auto ret = tablet_client->Delete(table_info->tid(), pid, option, options_->request_timeout); if (!ret.OK()) { return {StatusCode::kCmdError, ret.GetMsg()}; } } + } else { + uint32_t pid = ::openmldb::base::hash64(option.key) % table_info->table_partition_size(); + auto tablet = cluster_sdk_->GetTablet(table_info->db(), table_info->name(), pid); + if (!tablet) { + return {StatusCode::kCmdError, "cannot connect tablet"}; + } + auto tablet_client = tablet->GetClient(); + if (!tablet_client) { + return {StatusCode::kCmdError, "tablet client is null"}; + } + if (auto ret = tablet_client->Delete(table_info->tid(), pid, option, options_->request_timeout); !ret.OK()) { + return {StatusCode::kCmdError, ret.GetMsg()}; + } } return {}; } @@ -3589,7 +3578,7 @@ bool SQLClusterRouter::ExecuteDelete(std::shared_ptr row, hybridse if (!status->IsOK()) { return false; } - *status = SendDeleteRequst(table_info, &option); + *status = SendDeleteRequst(table_info, option); return status->IsOK(); } @@ -4878,7 +4867,9 @@ ::hybridse::sdk::Status SQLClusterRouter::RevertPut(const nameserver::TableInfo& } std::map index_val = {{val.second, val.first}}; uint64_t end_ts = cur_ts > 0 ? cur_ts - 1 : 0; - client->Delete(table_info.tid(), kv.first, index_val, "", cur_ts, end_ts); + DeleteOption option(val.second, val.first, "", cur_ts, end_ts); + option.enable_decode_value = false; + client->Delete(table_info.tid(), kv.first, option, options_->request_timeout); } if (kv.first == end_pid) { break; diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h index 934e5e4b9cc..1226ee4f987 100644 --- a/src/sdk/sql_cluster_router.h +++ b/src/sdk/sql_cluster_router.h @@ -375,7 +375,7 @@ class SQLClusterRouter : public SQLRouter { const hybridse::node::ExprNode* condition); hybridse::sdk::Status SendDeleteRequst(const std::shared_ptr& table_info, - const DeleteOption* option); + const DeleteOption& option); hybridse::sdk::Status HandleIndex(const std::string& db, const std::set>& table_pair, diff --git a/src/sdk/sql_cluster_test.cc b/src/sdk/sql_cluster_test.cc index c3bb0f08e9d..ee421a27ecb 100644 --- a/src/sdk/sql_cluster_test.cc +++ b/src/sdk/sql_cluster_test.cc @@ -329,47 +329,6 @@ TEST_F(SQLClusterDDLTest, CreateIndexCheck) { ASSERT_TRUE(router->DropDB(db, &status)); } -TEST_F(SQLClusterDDLTest, TestDelete) { - std::string name = "test" + GenRand(); - ::hybridse::sdk::Status status; - std::string ddl; - - std::string db = "db" + GenRand(); - ASSERT_TRUE(router->CreateDB(db, &status)); - ddl = absl::StrCat("create table ", name, - "(col1 string, col2 string, col3 string, col4 bigint, col5 bigint, col6 bigint, col7 string," - "index(key=col1, ts=col4), index(key=(col1, col2), ts=col4), index(key=col3, ts=col5));"); - ASSERT_TRUE(router->ExecuteDDL(db, ddl, &status)) << "ddl: " << ddl; - ASSERT_TRUE(router->RefreshCatalog()); - router->ExecuteSQL(db, "insert into " + name + " values ('a', 'aa', 'aaa', 100, 101, 102, 'xx');", &status); - router->ExecuteSQL(db, "insert into " + name + " values ('b', 'bb', 'bbb', 200, 201, 202, 'xx');", &status); - auto rs = router->ExecuteSQL(db, "select * from " + name + ";", &status); - ASSERT_EQ(rs->Size(), 2); - rs = router->ExecuteSQL(db, "delete from " + name + " where col1 = 'xxx' and col5 > 100;", &status); - ASSERT_FALSE(status.IsOK()); - rs = router->ExecuteSQL(db, "delete from " + name + " where col1 = 'xxx' and col6 > 100;", &status); - ASSERT_FALSE(status.IsOK()); - rs = router->ExecuteSQL(db, "delete from " + name + " where col1 = 'xxx' and col3 = 'aaa';", &status); - ASSERT_FALSE(status.IsOK()); - rs = router->ExecuteSQL(db, "delete from " + name + " where col7 = 'xxx' and col3 = 'aaa';", &status); - ASSERT_FALSE(status.IsOK()); - router->ExecuteSQL(db, "delete from " + name + " where col6 > 100;", &status); - ASSERT_FALSE(status.IsOK()); - router->ExecuteSQL(db, "delete from " + name + " where col4 > 100 and col5 = 200;", &status); - ASSERT_FALSE(status.IsOK()); - router->ExecuteSQL(db, "delete from " + name + " where col5 > 100;", &status); - ASSERT_TRUE(status.IsOK()) << status.msg; - rs = router->ExecuteSQL(db, "select * from " + name + ";", &status); - ASSERT_EQ(rs->Size(), 2); - router->ExecuteSQL(db, "delete from " + name + " where col4 > 100;", &status); - ASSERT_TRUE(status.IsOK()); - rs = router->ExecuteSQL(db, "select * from " + name + ";", &status); - ASSERT_EQ(rs->Size(), 1); - - ASSERT_TRUE(router->ExecuteDDL(db, "drop table " + name + ";", &status)); - ASSERT_TRUE(router->DropDB(db, &status)); -} - TEST_F(SQLClusterDDLTest, ColumnDefaultValue) { std::string name = "test" + GenRand(); ::hybridse::sdk::Status status; diff --git a/src/storage/disk_table.h b/src/storage/disk_table.h index 7b471bac45e..9e207dc831d 100644 --- a/src/storage/disk_table.h +++ b/src/storage/disk_table.h @@ -185,9 +185,6 @@ class DiskTable : public Table { base::Status Truncate(); - bool Delete(uint32_t idx, const std::string& pk, const std::optional& start_ts, - const std::optional& end_ts) override; - uint64_t GetExpireTime(const TTLSt& ttl_st) override; uint64_t GetRecordCnt() override { @@ -240,6 +237,10 @@ class DiskTable : public Table { private: base::Status Delete(uint32_t idx, const std::string& pk, uint64_t start_ts, const std::optional& end_ts); + bool Delete(uint32_t idx, const std::string& pk, + const std::optional& start_ts, const std::optional& end_ts) override; + + private: rocksdb::DB* db_; rocksdb::WriteOptions write_opts_; diff --git a/src/storage/mem_table.cc b/src/storage/mem_table.cc index 3a57ffc4e93..91148eef09b 100644 --- a/src/storage/mem_table.cc +++ b/src/storage/mem_table.cc @@ -145,10 +145,6 @@ absl::Status MemTable::Put(uint64_t time, const std::string& value, const Dimens PDLOG(WARNING, "empty dimension. tid %u pid %u", id_, pid_); return absl::InvalidArgumentError(absl::StrCat(id_, ".", pid_, ": empty dimension")); } - if (value.length() < codec::HEADER_LENGTH) { - PDLOG(WARNING, "invalid value. tid %u pid %u", id_, pid_); - return absl::InvalidArgumentError(absl::StrCat(id_, ".", pid_, ": invalid value")); - } // inner index pos: -1 means invalid, so it's positive in inner_index_key_map std::map inner_index_key_map; for (auto iter = dimensions.begin(); iter != dimensions.end(); iter++) { @@ -161,9 +157,15 @@ absl::Status MemTable::Put(uint64_t time, const std::string& value, const Dimens uint32_t real_ref_cnt = 0; const int8_t* data = reinterpret_cast(value.data()); std::string uncompress_data; + uint32_t data_length = value.length(); if (GetCompressType() == openmldb::type::kSnappy) { snappy::Uncompress(value.data(), value.size(), &uncompress_data); data = reinterpret_cast(uncompress_data.data()); + data_length = uncompress_data.length(); + } + if (data_length < codec::HEADER_LENGTH) { + PDLOG(WARNING, "invalid value. tid %u pid %u", id_, pid_); + return absl::InvalidArgumentError(absl::StrCat(id_, ".", pid_, ": invalid value")); } uint8_t version = codec::RowView::GetSchemaVersion(data); auto decoder = GetVersionDecoder(version); diff --git a/src/storage/mem_table.h b/src/storage/mem_table.h index e85762a97dc..c84863eb1c7 100644 --- a/src/storage/mem_table.h +++ b/src/storage/mem_table.h @@ -60,8 +60,6 @@ class MemTable : public Table { const ::google::protobuf::RepeatedPtrField<::openmldb::api::BulkLoadIndex>& indexes); bool Delete(const ::openmldb::api::LogEntry& entry) override; - bool Delete(uint32_t idx, const std::string& key, const std::optional& start_ts, - const std::optional& end_ts); // use the first demission TableIterator* NewIterator(const std::string& pk, Ticket& ticket) override; @@ -112,6 +110,9 @@ class MemTable : public Table { bool CheckLatest(uint32_t index_id, const std::string& key, uint64_t ts); + bool Delete(uint32_t idx, const std::string& key, + const std::optional& start_ts, const std::optional& end_ts); + private: uint32_t seg_cnt_; std::vector segments_; diff --git a/src/storage/schema.h b/src/storage/schema.h index 39359761ed9..7be52edfcb0 100644 --- a/src/storage/schema.h +++ b/src/storage/schema.h @@ -32,9 +32,15 @@ namespace openmldb::storage { static constexpr uint32_t MAX_INDEX_NUM = 200; static constexpr uint32_t DEFAULT_TS_COL_ID = UINT32_MAX; -static constexpr const char* DEFAULT_TS_COL_NAME = "default_ts"; - -enum TTLType { kAbsoluteTime = 1, kRelativeTime = 2, kLatestTime = 3, kAbsAndLat = 4, kAbsOrLat = 5 }; +static constexpr const char* DEFAULT_TS_COL_NAME = "___default_ts___"; + +enum TTLType { + kAbsoluteTime = 1, + kRelativeTime = 2, + kLatestTime = 3, + kAbsAndLat = 4, + kAbsOrLat = 5 +}; // ttl unit: millisecond struct TTLSt { diff --git a/src/storage/segment.cc b/src/storage/segment.cc index 8255d27b7bd..1734dc022c5 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -257,10 +257,11 @@ bool Segment::Put(const Slice& key, const std::map& ts_map, D } bool Segment::Delete(const std::optional& idx, const Slice& key) { + uint32_t ts_idx = 0; + if (!GetTsIdx(idx, &ts_idx)) { + return false; + } if (ts_cnt_ == 1) { - if (idx.has_value() && ts_idx_map_.find(idx.value()) == ts_idx_map_.end()) { - return false; - } ::openmldb::base::Node* entry_node = nullptr; { std::lock_guard lock(mu_); @@ -271,13 +272,6 @@ bool Segment::Delete(const std::optional& idx, const Slice& key) { return true; } } else { - if (!idx.has_value()) { - return false; - } - auto iter = ts_idx_map_.find(idx.value()); - if (iter == ts_idx_map_.end()) { - return false; - } base::Node* data_node = nullptr; { std::lock_guard lock(mu_); @@ -285,7 +279,7 @@ bool Segment::Delete(const std::optional& idx, const Slice& key) { if (entries_->Get(key, entry_arr) < 0 || entry_arr == nullptr) { return true; } - KeyEntry* key_entry = reinterpret_cast(entry_arr)[iter->second]; + KeyEntry* key_entry = reinterpret_cast(entry_arr)[ts_idx]; std::unique_ptr it(key_entry->entries.NewIterator()); it->SeekToFirst(); if (it->Valid()) { @@ -294,25 +288,18 @@ bool Segment::Delete(const std::optional& idx, const Slice& key) { } } if (data_node != nullptr) { - node_cache_.AddValueNodeList(iter->second, gc_version_.load(std::memory_order_relaxed), data_node); + node_cache_.AddValueNodeList(ts_idx, gc_version_.load(std::memory_order_relaxed), data_node); } } return true; } -bool Segment::Delete(const std::optional& idx, const Slice& key, uint64_t ts, - const std::optional& end_ts) { - void* entry = nullptr; - if (entries_->Get(key, entry) < 0 || entry == nullptr) { - return true; - } - KeyEntry* key_entry = nullptr; - uint32_t ts_idx = 0; +bool Segment::GetTsIdx(const std::optional& idx, uint32_t* ts_idx) { + *ts_idx = 0; if (ts_cnt_ == 1) { if (idx.has_value() && ts_idx_map_.find(idx.value()) == ts_idx_map_.end()) { return false; } - key_entry = reinterpret_cast(entry); } else { if (!idx.has_value()) { return false; @@ -321,8 +308,55 @@ bool Segment::Delete(const std::optional& idx, const Slice& key, uint6 if (iter == ts_idx_map_.end()) { return false; } - key_entry = reinterpret_cast(entry)[iter->second]; - ts_idx = iter->second; + *ts_idx = iter->second; + } + return true; +} + +bool Segment::Delete(const std::optional& idx, const Slice& key, uint64_t ts) { + uint32_t ts_idx = 0; + if (!GetTsIdx(idx, &ts_idx)) { + return false; + } + void* entry = nullptr; + if (entries_->Get(key, entry) < 0 || entry == nullptr) { + return true; + } + KeyEntry* key_entry = nullptr; + if (ts_cnt_ == 1) { + key_entry = reinterpret_cast(entry); + } else { + key_entry = reinterpret_cast(entry)[ts_idx]; + } + base::Node* data_node = nullptr; + { + std::lock_guard lock(mu_); + data_node = key_entry->entries.Remove(ts); + } + if (data_node) { + node_cache_.AddSingleValueNode(ts_idx, gc_version_.load(std::memory_order_relaxed), data_node); + } + return true; +} + +bool Segment::Delete(const std::optional& idx, const Slice& key, + uint64_t ts, const std::optional& end_ts) { + if (end_ts.has_value() && end_ts.value() + 1 == ts) { + return Delete(idx, key, ts); + } + uint32_t ts_idx = 0; + if (!GetTsIdx(idx, &ts_idx)) { + return false; + } + void* entry = nullptr; + if (entries_->Get(key, entry) < 0 || entry == nullptr) { + return true; + } + KeyEntry* key_entry = nullptr; + if (ts_cnt_ == 1) { + key_entry = reinterpret_cast(entry); + } else { + key_entry = reinterpret_cast(entry)[ts_idx]; } if (end_ts.has_value()) { if (auto node = key_entry->entries.GetLast(); node == nullptr) { diff --git a/src/storage/segment.h b/src/storage/segment.h index 11322483832..42f5ad6946c 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -81,8 +81,9 @@ class Segment { bool Put(const Slice& key, const std::map& ts_map, DataBlock* row, bool put_if_absent = false); bool Delete(const std::optional& idx, const Slice& key); - bool Delete(const std::optional& idx, const Slice& key, uint64_t ts, - const std::optional& end_ts); + bool Delete(const std::optional& idx, const Slice& key, uint64_t ts); + bool Delete(const std::optional& idx, const Slice& key, + uint64_t ts, const std::optional& end_ts); void Release(StatisticsInfo* statistics_info); @@ -144,6 +145,7 @@ class Segment { private: void FreeList(uint32_t ts_idx, ::openmldb::base::Node* node, StatisticsInfo* statistics_info); void SplitList(KeyEntry* entry, uint64_t ts, ::openmldb::base::Node** node); + bool GetTsIdx(const std::optional& idx, uint32_t* ts_idx); bool ListContains(KeyEntry* entry, uint64_t time, DataBlock* row, bool check_all_time); diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 8b2b446e874..42e68db04c2 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -102,6 +102,7 @@ DECLARE_bool(use_name); DECLARE_bool(enable_distsql); DECLARE_string(snapshot_compression); DECLARE_string(file_compression); +DECLARE_int32(request_timeout_ms); // cluster config DECLARE_string(endpoint); @@ -626,17 +627,9 @@ void TabletImpl::Get(RpcController* controller, const ::openmldb::api::GetReques } else { pid = request->pid(); } - std::shared_ptr table = GetTable(tid, pid); - if (!table) { - PDLOG(WARNING, "table does not exist. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); - response->set_msg("table does not exist"); - return; - } - if (table->GetTableStat() == ::openmldb::storage::kLoading) { - PDLOG(WARNING, "table is loading. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsLoading); - response->set_msg("table is loading"); + auto table = GetTable(tid, pid); + if (auto status = CheckTable(tid, pid, false, table); !status.OK()) { + SetResponseStatus(status, response); return; } std::string index_name; @@ -714,43 +707,21 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques } uint32_t tid = request->tid(); uint32_t pid = request->pid(); - uint64_t start_time = ::baidu::common::timer::get_micros(); - std::shared_ptr
table = GetTable(tid, pid); - if (!table) { - PDLOG(WARNING, "table does not exist. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); - response->set_msg("table does not exist"); + auto table = GetTable(tid, pid); + if (auto status = CheckTable(tid, pid, true, table); !status.OK()) { + SetResponseStatus(status, response); return; } + uint64_t start_time = ::baidu::common::timer::get_micros(); DLOG(INFO) << "request dimension size " << request->dimensions_size() << " request time " << request->time(); - if (!table->IsLeader()) { - response->set_code(::openmldb::base::ReturnCode::kTableIsFollower); - response->set_msg("table is follower"); + if (table->GetStorageMode() == ::openmldb::common::StorageMode::kMemory && + memory_used_.load(std::memory_order_relaxed) > FLAGS_max_memory_mb) { + PDLOG(WARNING, "current memory %lu MB exceed max memory limit %lu MB. tid %u, pid %u", + memory_used_.load(std::memory_order_relaxed), FLAGS_max_memory_mb, tid, pid); + response->set_code(::openmldb::base::ReturnCode::kExceedMaxMemory); + response->set_msg("exceed max memory"); return; } - if (table->GetTableStat() == ::openmldb::storage::kLoading) { - PDLOG(WARNING, "table is loading. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsLoading); - response->set_msg("table is loading"); - return; - } - if (table->GetStorageMode() == ::openmldb::common::StorageMode::kMemory) { - if (memory_used_.load(std::memory_order_relaxed) > FLAGS_max_memory_mb) { - PDLOG(WARNING, "current memory %lu MB exceed max memory limit %lu MB. tid %u, pid %u", - memory_used_.load(std::memory_order_relaxed), FLAGS_max_memory_mb, tid, pid); - response->set_code(base::ReturnCode::kExceedMaxMemory); - response->set_msg("exceed max memory"); - return; - } - if (request->has_memory_limit() && request->memory_limit() > 0 - && system_memory_usage_rate_.load(std::memory_order_relaxed) > request->memory_limit()) { - PDLOG(WARNING, "current system_memory_usage_rate %u exceed request memory limit %u. tid %u, pid %u", - system_memory_usage_rate_.load(std::memory_order_relaxed), request->memory_limit(), tid, pid); - response->set_code(base::ReturnCode::kExceedPutMemoryLimit); - response->set_msg("exceed memory limit"); - return; - } - } ::openmldb::api::LogEntry entry; entry.set_pk(request->pk()); entry.set_ts(request->time()); @@ -1159,17 +1130,9 @@ void TabletImpl::Scan(RpcController* controller, const ::openmldb::api::ScanRequ } else { pid = request->pid(); } - std::shared_ptr
table = GetTable(tid, pid); - if (!table) { - PDLOG(WARNING, "table does not exist. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); - response->set_msg("table does not exist"); - return; - } - if (table->GetTableStat() == ::openmldb::storage::kLoading) { - PDLOG(WARNING, "table is loading. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsLoading); - response->set_msg("table is loading"); + auto table = GetTable(tid, pid); + if (auto status = CheckTable(tid, pid, false, table); !status.OK()) { + SetResponseStatus(status, response); return; } uint32_t index = 0; @@ -1253,17 +1216,11 @@ void TabletImpl::Scan(RpcController* controller, const ::openmldb::api::ScanRequ void TabletImpl::Count(RpcController* controller, const ::openmldb::api::CountRequest* request, ::openmldb::api::CountResponse* response, Closure* done) { brpc::ClosureGuard done_guard(done); - std::shared_ptr
table = GetTable(request->tid(), request->pid()); - if (!table) { - PDLOG(WARNING, "table does not exist. tid %u, pid %u", request->tid(), request->pid()); - response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); - response->set_msg("table does not exist"); - return; - } - if (table->GetTableStat() == ::openmldb::storage::kLoading) { - PDLOG(WARNING, "table is loading. tid %u, pid %u", request->tid(), request->pid()); - response->set_code(::openmldb::base::ReturnCode::kTableIsLoading); - response->set_msg("table is loading"); + uint32_t tid = request->tid(); + uint32_t pid = request->pid(); + auto table = GetTable(tid, pid); + if (auto status = CheckTable(tid, pid, false, table); !status.OK()) { + SetResponseStatus(status, response); return; } uint32_t index = 0; @@ -1277,8 +1234,7 @@ void TabletImpl::Count(RpcController* controller, const ::openmldb::api::CountRe } index_def = table->GetIndex(index_name); if (!index_def || !index_def->IsReady()) { - PDLOG(WARNING, "idx name %s not found in table tid %u, pid %u", request->idx_name().c_str(), request->tid(), - request->pid()); + PDLOG(WARNING, "idx name %s not found in table tid %u, pid %u", request->idx_name().c_str(), tid, pid); response->set_code(::openmldb::base::ReturnCode::kIdxNameNotFound); response->set_msg("idx name not found"); return; @@ -1337,17 +1293,9 @@ void TabletImpl::Traverse(RpcController* controller, const ::openmldb::api::Trav brpc::ClosureGuard done_guard(done); uint32_t tid = request->tid(); uint32_t pid = request->pid(); - std::shared_ptr
table = GetTable(tid, pid); - if (!table) { - PDLOG(WARNING, "table does not exist. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); - response->set_msg("table does not exist"); - return; - } - if (table->GetTableStat() == ::openmldb::storage::kLoading) { - PDLOG(WARNING, "table is loading. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsLoading); - response->set_msg("table is loading"); + auto table = GetTable(tid, pid); + if (auto status = CheckTable(tid, pid, false, table); !status.OK()) { + SetResponseStatus(status, response); return; } std::string index_name; @@ -1458,6 +1406,140 @@ void TabletImpl::Traverse(RpcController* controller, const ::openmldb::api::Trav response->set_ts_pos(ts_pos); } +base::Status TabletImpl::CheckTable(uint32_t tid, uint32_t pid, bool check_leader, + const std::shared_ptr
& table) { + if (!table) { + PDLOG(WARNING, "table does not exist. tid %u, pid %u", tid, pid); + return {base::ReturnCode::kTableIsNotExist, "table does not exist"}; + } + if (check_leader && !table->IsLeader()) { + DEBUGLOG("table is follower. tid %u, pid %u", tid, pid); + return {base::ReturnCode::kTableIsFollower, "table is follower"}; + } + if (table->GetTableStat() == ::openmldb::storage::kLoading) { + PDLOG(WARNING, "table is loading. tid %u, pid %u", tid, pid); + return {base::ReturnCode::kTableIsLoading, "table is loading"}; + } + return {}; +} + +base::Status TabletImpl::DeleteAllIndex(const std::shared_ptr& table, + const std::shared_ptr& cur_index, + const std::string& key, + std::optional start_ts, + std::optional end_ts, + bool skip_cur_ts_col, + const std::shared_ptr& client_manager, + uint32_t partition_num) { + storage::Ticket ticket; + std::unique_ptr iter(table->NewIterator(cur_index->GetId(), key, ticket)); + if (start_ts.has_value()) { + iter->Seek(start_ts.value()); + } else { + iter->SeekToFirst(); + } + auto indexs = table->GetAllIndex(); + while (iter->Valid()) { + DEBUGLOG("cur ts %lu cur index pos %u", iter->GetKey(), cur_index->GetId()); + if (end_ts.has_value() && iter->GetKey() <= end_ts.value()) { + break; + } + auto value = iter->GetValue(); + uint32_t data_length = value.size(); + const int8_t* data = reinterpret_cast(value.data()); + std::string uncompress_data; + if (table->GetCompressType() == openmldb::type::kSnappy) { + snappy::Uncompress(value.data(), value.size(), &uncompress_data); + data = reinterpret_cast(uncompress_data.data()); + data_length = uncompress_data.length(); + } + if (data_length < codec::HEADER_LENGTH) { + return {base::ReturnCode::kDeleteFailed, "invalid value"}; + } + uint8_t version = codec::RowView::GetSchemaVersion(data); + auto decoder = table->GetVersionDecoder(version); + if (decoder == nullptr) { + return {base::ReturnCode::kDeleteFailed, "invalid schema version"}; + } + for (const auto& index : indexs) { + if (!index->IsReady()) { + continue; + } + if (cur_index && index->GetId() == cur_index->GetId()) { + continue; + } + auto ts_col = index->GetTsColumn(); + if (skip_cur_ts_col && ts_col->GetId() == cur_index->GetTsColumn()->GetId()) { + continue; + } + sdk::DeleteOption option; + option.idx = index->GetId(); + if (ts_col->IsAutoGenTs()) { + option.start_ts = iter->GetKey(); + } else { + int64_t ts = 0; + if (decoder->GetInteger(data, ts_col->GetId(), ts_col->GetType(), &ts) != 0) { + return {base::ReturnCode::kDeleteFailed, "get ts value failed"}; + } + option.ts_name = ts_col->GetName(); + option.start_ts = ts; + } + if (option.start_ts.value() > 1) { + option.end_ts = option.start_ts.value() - 1; + } + const auto& cols = index->GetColumns(); + if (cols.size() == 1) { + const auto& col = cols.front(); + if (decoder->IsNULL(data, col.GetId())) { + option.key = hybridse::codec::NONETOKEN; + } else if (decoder->GetStrValue(data, col.GetId(), &option.key) != 0) { + return {base::ReturnCode::kDeleteFailed, "get key failed"}; + } + if (option.key.empty()) { + option.key = hybridse::codec::EMPTY_STRING; + } + } else { + for (const auto& col : cols) { + std::string tmp; + if (decoder->IsNULL(data, col.GetId())) { + tmp = hybridse::codec::NONETOKEN; + } else if (decoder->GetStrValue(data, col.GetId(), &tmp) != 0) { + return {base::ReturnCode::kDeleteFailed, "get key failed"}; + } + if (tmp.empty()) { + tmp = hybridse::codec::EMPTY_STRING; + } + if (!option.key.empty()) { + option.key.append("|"); + } + option.key.append(tmp); + } + } + uint32_t cur_pid = static_cast(base::hash64(option.key)) % partition_num; + auto tablet = client_manager->GetTablet(cur_pid); + if (tablet == nullptr) { + return {base::ReturnCode::kDeleteFailed, absl::StrCat("tablet is nullptr, pid ", cur_pid)}; + } + auto client = tablet->GetClient(); + if (client == nullptr) { + return {base::ReturnCode::kDeleteFailed, absl::StrCat("client is nullptr, pid ", cur_pid)}; + } + DEBUGLOG("delete idx %u pid %u pk %s ts %lu end_ts %lu", + option.idx.value(), cur_pid, option.key.c_str(), option.start_ts.value(), option.end_ts.value()); + std::string msg; + // do not delete other index data + option.enable_decode_value = false; + if (auto status = client->Delete(table->GetId(), cur_pid, option, FLAGS_request_timeout_ms); !status.OK()) { + return {base::ReturnCode::kDeleteFailed, + absl::StrCat("delete failed. key ", option.key, " pid ", cur_pid, " msg: ", status.GetMsg())}; + } + } + + iter->Next(); + } + return {}; +} + void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::DeleteRequest* request, openmldb::api::GeneralResponse* response, Closure* done) { brpc::ClosureGuard done_guard(done); @@ -1468,23 +1550,9 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete response->set_msg("is follower cluster"); return; } - std::shared_ptr
table = GetTable(tid, pid); - if (!table) { - PDLOG(WARNING, "table does not exist. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); - response->set_msg("table does not exist"); - return; - } - if (!table->IsLeader()) { - DEBUGLOG("table is follower. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsFollower); - response->set_msg("table is follower"); - return; - } - if (table->GetTableStat() == ::openmldb::storage::kLoading) { - PDLOG(WARNING, "table is loading. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsLoading); - response->set_msg("table is loading"); + auto table = GetTable(tid, pid); + if (auto status = CheckTable(tid, pid, true, table); !status.OK()) { + SetResponseStatus(status, response); return; } auto replicator = GetReplicator(tid, pid); @@ -1495,21 +1563,24 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete ::openmldb::api::LogEntry entry; entry.set_term(replicator->GetLeaderTerm()); entry.set_method_type(::openmldb::api::MethodType::kDelete); - uint32_t idx = 0; if (request->dimensions_size() > 0) { - entry.mutable_dimensions()->CopyFrom(request->dimensions()); - idx = entry.dimensions(0).idx(); - } else { - if (request->has_idx_name() && request->idx_name().size() > 0) { - std::shared_ptr index_def = table->GetIndex(request->idx_name()); - if (!index_def || !index_def->IsReady()) { - PDLOG(WARNING, "idx name %s not found in table tid %u, pid %u", request->idx_name().c_str(), tid, pid); - response->set_code(::openmldb::base::ReturnCode::kIdxNameNotFound); - response->set_msg("idx name not found"); - return; - } - idx = index_def->GetId(); + entry.add_dimensions()->CopyFrom(request->dimensions(0)); + auto index_def = table->GetIndex(request->dimensions(0).idx()); + if (!index_def || !index_def->IsReady()) { + PDLOG(WARNING, "index %s not found in table tid %u, pid %u", request->dimensions(0).idx(), tid, pid); + response->set_code(::openmldb::base::ReturnCode::kIdxNameNotFound); + response->set_msg("index not found"); + return; } + } else if (request->has_idx_name() && !request->idx_name().empty()) { + auto index_def = table->GetIndex(request->idx_name()); + if (!index_def || !index_def->IsReady()) { + PDLOG(WARNING, "idx name %s not found in table tid %u, pid %u", request->idx_name().c_str(), tid, pid); + response->set_code(::openmldb::base::ReturnCode::kIdxNameNotFound); + response->set_msg("index not found"); + return; + } + uint32_t idx = index_def->GetId(); if (request->has_key()) { auto dimension = entry.add_dimensions(); dimension->set_key(request->key()); @@ -1531,8 +1602,27 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete PDLOG(WARNING, "invalid args. tid %u, pid %u", tid, pid); return; } + bool delete_others = false; + if (request->has_enable_decode_value() && request->enable_decode_value()) { + auto indexs = table->GetAllIndex(); + if (entry.dimensions_size() > 0) { + if (indexs.size() > 1) { + delete_others = true; + } + } else if (request->has_ts_name()) { + for (const auto& index : indexs) { + if (!index->IsReady()) { + continue; + } + if (index->GetTsColumn()->GetName() != request->ts_name()) { + delete_others = true; + break; + } + } + } + } auto aggrs = GetAggregators(tid, pid); - if (!aggrs) { + if (!aggrs && !delete_others) { if (table->Delete(entry)) { DEBUGLOG("delete ok. tid %u, pid %u, key %s", tid, pid, request->key().c_str()); } else { @@ -1541,7 +1631,7 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete return; } } else { - auto get_aggregator = [this](std::shared_ptr aggrs, uint32_t idx) -> std::shared_ptr { + auto get_aggregator = [this](const std::shared_ptr& aggrs, uint32_t idx) -> std::shared_ptr { if (aggrs) { for (const auto& aggr : *aggrs) { if (aggr->GetIndexPos() == idx) { @@ -1553,36 +1643,74 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete }; std::optional start_ts = entry.has_ts() ? std::optional{entry.ts()} : std::nullopt; std::optional end_ts = entry.has_end_ts() ? std::optional{entry.end_ts()} : std::nullopt; + auto handler = catalog_->GetTable(table->GetDB(), table->GetName()); + if (!handler) { + response->set_code(::openmldb::base::ReturnCode::kDeleteFailed); + response->set_msg("no TableHandler"); + PDLOG(WARNING, "no TableHandler. tid %u, pid %u", tid, pid); + return; + } + auto tablet_table_handler = std::dynamic_pointer_cast(handler); + if (!tablet_table_handler) { + response->set_code(::openmldb::base::ReturnCode::kDeleteFailed); + response->set_msg("convert TabletTableHandler failed"); + PDLOG(WARNING, "convert TabletTableHandler failed. tid %u, pid %u", tid, pid); + return; + } + uint32_t pid_num = tablet_table_handler->GetPartitionNum(); + auto table_client_manager = tablet_table_handler->GetTableClientManager(); if (entry.dimensions_size() > 0) { - for (const auto& dimension : entry.dimensions()) { - if (!table->Delete(dimension.idx(), dimension.key(), start_ts, end_ts)) { - response->set_code(::openmldb::base::ReturnCode::kDeleteFailed); - response->set_msg("delete failed"); + const auto& dimension = entry.dimensions(0); + uint32_t idx = dimension.idx(); + auto index_def = table->GetIndex(idx); + const auto& key = dimension.key(); + if (delete_others) { + auto status = DeleteAllIndex(table, index_def, key, start_ts, end_ts, false, + table_client_manager, pid_num); + if (!status.OK()) { + SET_RESP_AND_WARN(response, status.GetCode(), status.GetMsg()); return; } - auto aggr = get_aggregator(aggrs, dimension.idx()); - if (aggr) { - if (!aggr->Delete(dimension.key(), start_ts, end_ts)) { - PDLOG(WARNING, "delete from aggr failed. base table: tid[%u] pid[%u] index[%u] key[%s]. " - "aggr table: tid[%u]", - tid, pid, idx, dimension.key().c_str(), aggr->GetAggrTid()); - response->set_code(::openmldb::base::ReturnCode::kDeleteFailed); - response->set_msg("delete from associated pre-aggr table failed"); - return; - } + } + if (!table->Delete(idx, key, start_ts, end_ts)) { + response->set_code(::openmldb::base::ReturnCode::kDeleteFailed); + response->set_msg("delete failed"); + return; + } + auto aggr = get_aggregator(aggrs, idx); + if (aggr) { + if (!aggr->Delete(key, start_ts, end_ts)) { + PDLOG(WARNING, "delete from aggr failed. base table: tid[%u] pid[%u] index[%u] key[%s]. " + "aggr table: tid[%u]", + tid, pid, idx, key.c_str(), aggr->GetAggrTid()); + response->set_code(::openmldb::base::ReturnCode::kDeleteFailed); + response->set_msg("delete from associated pre-aggr table failed"); + return; } - DEBUGLOG("delete ok. tid %u, pid %u, key %s", tid, pid, dimension.key().c_str()); } + DEBUGLOG("delete ok. tid %u, pid %u, key %s", tid, pid, key.c_str()); } else { + bool is_first_hit_index = true; for (const auto& index_def : table->GetAllIndex()) { if (!index_def || !index_def->IsReady()) { continue; } + if (index_def->GetTsColumn()->GetName() != request->ts_name()) { + continue; + } uint32_t idx = index_def->GetId(); std::unique_ptr iter(table->NewTraverseIterator(idx)); iter->SeekToFirst(); while (iter->Valid()) { auto pk = iter->GetPK(); + if (delete_others && is_first_hit_index) { + auto status = DeleteAllIndex(table, index_def, pk, start_ts, end_ts, true, + table_client_manager, pid_num); + if (!status.OK()) { + SET_RESP_AND_WARN(response, status.GetCode(), status.GetMsg()); + return; + } + } iter->NextPK(); if (!table->Delete(idx, pk, start_ts, end_ts)) { response->set_code(::openmldb::base::ReturnCode::kDeleteFailed); @@ -1600,6 +1728,7 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete } } } + is_first_hit_index = false; } } } @@ -1946,16 +2075,9 @@ void TabletImpl::ChangeRole(RpcController* controller, const ::openmldb::api::Ch brpc::ClosureGuard done_guard(done); uint32_t tid = request->tid(); uint32_t pid = request->pid(); - std::shared_ptr
table = GetTable(tid, pid); - if (!table) { - response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); - response->set_msg("table does not exist"); - return; - } - if (table->GetTableStat() != ::openmldb::storage::kNormal) { - PDLOG(WARNING, "table state[%u] can not change role. tid[%u] pid[%u]", table->GetTableStat(), tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableStatusIsNotKnormal); - response->set_msg("table status is not kNormal"); + auto table = GetTable(tid, pid); + if (auto status = CheckTable(tid, pid, false, table); !status.OK()) { + SetResponseStatus(status, response); return; } std::shared_ptr replicator = GetReplicator(tid, pid); @@ -2169,11 +2291,9 @@ void TabletImpl::AppendEntries(RpcController* controller, const ::openmldb::api: brpc::ClosureGuard done_guard(done); uint32_t tid = request->tid(); uint32_t pid = request->pid(); - std::shared_ptr
table = GetTable(tid, pid); - if (!table) { - PDLOG(WARNING, "table does not exist. tid %u, pid %u", tid, pid); - response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); - response->set_msg("table does not exist"); + auto table = GetTable(tid, pid); + if (auto status = CheckTable(tid, pid, false, table); !status.OK()) { + SetResponseStatus(status, response); return; } if (!follower_.load(std::memory_order_relaxed) && table->IsLeader()) { @@ -2182,12 +2302,6 @@ void TabletImpl::AppendEntries(RpcController* controller, const ::openmldb::api: response->set_msg("table is leader"); return; } - if (table->GetTableStat() == ::openmldb::storage::kLoading) { - response->set_code(::openmldb::base::ReturnCode::kTableIsLoading); - response->set_msg("table is loading"); - PDLOG(WARNING, "table is loading. tid %u, pid %u", tid, pid); - return; - } std::shared_ptr replicator = GetReplicator(tid, pid); if (!replicator) { response->set_code(::openmldb::base::ReturnCode::kReplicatorIsNotExist); diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h index 833dbe5ff70..c6ea7cd2b21 100644 --- a/src/tablet/tablet_impl.h +++ b/src/tablet/tablet_impl.h @@ -433,9 +433,18 @@ class TabletImpl : public ::openmldb::api::TabletServer { openmldb::api::QueryResponse& response, butil::IOBuf& buf); // NOLINT void CreateProcedure(const std::shared_ptr& sp_info); + base::Status CheckTable(uint32_t tid, uint32_t pid, bool check_leader, const std::shared_ptr
& table); // refresh the pre-aggr tables info bool RefreshAggrCatalog(); + base::Status DeleteAllIndex(const std::shared_ptr& table, + const std::shared_ptr& cur_index, + const std::string& key, + std::optional start_ts, + std::optional end_ts, + bool skip_cur_ts_col, + const std::shared_ptr& client_manager, + uint32_t partition_num); void UpdateMemoryUsage(); diff --git a/src/tablet/tablet_impl_test.cc b/src/tablet/tablet_impl_test.cc index 1a2de9e66d8..ec9773ce6b3 100644 --- a/src/tablet/tablet_impl_test.cc +++ b/src/tablet/tablet_impl_test.cc @@ -176,6 +176,7 @@ void AddDefaultSchema(uint64_t abs_ttl, uint64_t lat_ttl, ::openmldb::type::TTLT } void AddDefaultAggregatorBaseSchema(::openmldb::api::TableMeta* table_meta) { + table_meta->set_db("db1"); table_meta->set_name("t0"); table_meta->set_pid(1); table_meta->set_mode(::openmldb::api::TableMode::kTableLeader); @@ -191,6 +192,7 @@ void AddDefaultAggregatorBaseSchema(::openmldb::api::TableMeta* table_meta) { } void AddDefaultAggregatorSchema(::openmldb::api::TableMeta* table_meta) { + table_meta->set_db("db1"); table_meta->set_name("pre_aggr_1"); table_meta->set_pid(1); table_meta->set_mode(::openmldb::api::TableMode::kTableLeader);