Skip to content

Commit

Permalink
feat: delete data from all index (#3693)
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 authored Feb 22, 2024
1 parent 7b8a04a commit 8f19aeb
Show file tree
Hide file tree
Showing 30 changed files with 680 additions and 499 deletions.
1 change: 0 additions & 1 deletion docs/en/openmldb_sql/dml/DELETE_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ TableName ::=
```

**Description**
- `DELETE` statement will delete data fulfilling specific requirements in online table, not all data from the index. Only index related to where condition will be deleted. For more examples please check [function_boundary](../../quickstart/function_boundary.md#delete).
- The filter columns specified by `WHERE` must be an index column. if it is a key column, only `=` can be used.

## Examples
Expand Down
57 changes: 0 additions & 57 deletions docs/en/quickstart/function_boundary.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,63 +70,6 @@ It is recommended to use HDFS files as source data. This approach allows for suc
- In local mode, TaskManager can successfully import source data only if the source data is placed on the same host as the TaskManager process.
- When TaskManager is in Yarn mode (both client and cluster), a file path cannot be used as the source data address because it is not known on which host the container is running.

### DELETE

In tables with multiple indexes in the online storage, a `DELETE` operation may not delete corresponding data in all indexes. Consequently, there may be situations where data has been deleted, but the deleted data can still be found.

For example:

```SQL
create database db;
use db;
create table t1(c1 int, c2 int,index(key=c1),index(key=c2));
desc t1;
set @@execute_mode='online';
insert into t1 values (1,1),(2,2);
delete from t1 where c2=2;
select * from t1;
select * from t1 where c2=2;
```

The results are as follows:

```Plain
--- ------- ------ ------ ---------
Field Type Null Default
--- ------- ------ ------ ---------
1 c1 Int YES
2 c2 Int YES
--- ------- ------ ------ ---------
--- -------------------- ------ ---- ------ ---------------
name keys ts ttl ttl_type
--- -------------------- ------ ---- ------ ---------------
1 INDEX_0_1668504212 c1 - 0min kAbsoluteTime
2 INDEX_1_1668504212 c2 - 0min kAbsoluteTime
--- -------------------- ------ ---- ------ ---------------
--------------
storage_mode
--------------
Memory
--------------
---- ----
c1 c2
---- ----
1 1
2 2
---- ----
2 rows in set
---- ----
c1 c2
---- ----
0 rows in set
```

Explanation:

Table `t1` has multiple indexes (which may be automatically created during `DEPLOY`). If you run `delete from t1 where c2=2`, it only deletes data in the second index, while the data in the first index remains unaffected. Therefore, if you subsequently run `select * from t1` and it uses the first index, there are two pieces of data that haven't been deleted. `select * from t1 where c2=2` uses the second index, and the result is empty, with data being successfully deleted.

## DQL Boundary

The supported query modes (i.e. `SELECT` statements) vary depending on the execution mode:
Expand Down
1 change: 0 additions & 1 deletion docs/zh/openmldb_sql/dml/DELETE_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ TableName ::=

**说明**

- `DELETE` 语句删除在线表满足指定条件的数据,删除并不是所有索引中满足条件的数据都被删除,只会删除与where condition相关的索引,示例见[功能边界](../../quickstart/function_boundary.md#delete)
- `WHERE` 指定的筛选列必须是索引列。如果是key列只能用等于

## Examples
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/openmldb_sql/notice.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
| CREATE TABLE | 1. 在建表语句中如果没有指定索引,默认会自动创建一个`absolute 0`的索引。这个索引下的数据永不过期,可能会占用大量内存 <br> 2. 磁盘表`absandlat``absorlat`类型没有过期删除
| DROP TABLE | 1. 删除表默认是异步操作,执行完成后,异步删除表中的数据 <br> 2. 如果有分片在做snapshot, 会删除失败。可能存在部分分片删除部分没有删除的情况 <br> 3. 删除时默认会把数据目录放到recycle目录下。tablet的配置文件中`recycle_bin_enabled`参数可以配置是否要放到recycle, 默认是开启的 <br> 4. 由于内存碎片问题,释放的内存不一定完全释放给操作系统
| INSERT | 如果返回失败,可能有一部分数据已经插入进去
| DELETE | 1. 删除的数据不会立马从内存中物理删除,需要等一个过期删除时间间隔(即参数 `gc_interval`<br> 2. 如果设置了长窗口,不会更新预聚合表里的数据
| DELETE | 删除的数据不会立马从内存中物理删除,需要等一个过期删除时间间隔(即参数 `gc_interval`
| CREATE INDEX | 1. 创建索引是一个异步操作,如果表里有数据需要等一段时间 `desc` 命令才能显示出来 <br> 2. 在创建索引的过程中如果有写操作,那么可能会有部分新写入的数据在新加的索引上查询不出来 <br> 3. 磁盘表不支持创建索引
| DROP INDEX | 1. 删除一个索引之后,如果要再重新创建相同的索引需要等两个过期删除时间间隔(及参数 `gc_interval`) <br> 2. 执行该命令后,内存中的索引并没有被真正的马上删除,需要等两个过期删除时间间隔才会在内存中真正被执行删除动作 <br> 3. 磁盘表不支持删除索引
| DEPLOY | 1. DEPLOY 命令可能会修改相关表的TTL,执行DEPLOY前导入的数据可能在新TTL生效前被淘汰,新的TTL生效时间为2个`gc_interval` <br> 2. 在deployment关联的表中,如果有磁盘表需要添加索引,那么部署会失败,可能有部分索引已经添加成功
Expand Down
57 changes: 0 additions & 57 deletions docs/zh/quickstart/function_boundary.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,63 +82,6 @@ spark.default.conf=spark.port.maxRetries=32;foo=bar

单个任务最大的并发数限制为`spark.executor.instances`*`spark.executor.cores`,请调整这两个配置。当spark.master=local时,调整driver的,而不是executor的。

### DELETE

在线存储的表有多索引,`DELETE` 可能无法删除所有索引中的对应数据,所以,可能出现删除了数据,却能查出已删除数据的情况。

举例:

```SQL
create database db;
use db;
create table t1(c1 int, c2 int,index(key=c1),index(key=c2));
desc t1;
set @@execute_mode='online';
insert into t1 values (1,1),(2,2);
delete from t1 where c2=2;
select * from t1;
select * from t1 where c2=2;
```

结果如下:

```Plain
--- ------- ------ ------ ---------
Field Type Null Default
--- ------- ------ ------ ---------
1 c1 Int YES
2 c2 Int YES
--- ------- ------ ------ ---------
--- -------------------- ------ ---- ------ ---------------
name keys ts ttl ttl_type
--- -------------------- ------ ---- ------ ---------------
1 INDEX_0_1668504212 c1 - 0min kAbsoluteTime
2 INDEX_1_1668504212 c2 - 0min kAbsoluteTime
--- -------------------- ------ ---- ------ ---------------
--------------
storage_mode
--------------
Memory
--------------
---- ----
c1 c2
---- ----
1 1
2 2
---- ----
2 rows in set
---- ----
c1 c2
---- ----
0 rows in set
```

说明:

`t1` 有多个索引(`DEPLOY` 也可能自动创建出多索引),`delete from t1 where c2=2` 实际只删除了第二个 index 的数据,第一个 index 数据没有被影响。这是因为delete的where condition只与第二个index相关,第一个index中没有任何该condition相关的key或ts。而 `select * from t1` 使用第一个索引,并非第二个,结果就会有两条数据,直观感受为delete失败了;`select * from t1 where c2=2` 使用第二个索引,结果为空,证明数据在该索引下已被删除。

## DQL 边界

根据执行模式的不同,支持的查询模式(即 `SELECT` 语句)也有所不同:
Expand Down
2 changes: 0 additions & 2 deletions docs/zh/quickstart/openmldb_quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ SET @@execute_mode='online';
SELECT * FROM demo_table1 LIMIT 10;
```

注意,目前要求成功完成 SQL 上线部署后,才能导入在线数据;如果先导入在线数据,会导致部署出错。

```{note}
本篇教程在数据导入以后,略过了实时数据接入的步骤。在实际场景中,由于现实时间的推移,需要将最新的实时数据更新到在线数据库。具体可以通过 OpenMLDB SDK 或者在线数据源 connector 实现(如 Kafka、Pulsar 等)。
```
Expand Down
79 changes: 79 additions & 0 deletions src/catalog/client_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,33 @@ std::shared_ptr<::hybridse::vm::TableHandler> TabletAccessor::SubQuery(uint32_t
}
return async_table_handler;
}

void TabletsAccessor::AddTabletAccessor(std::shared_ptr<Tablet> accessor) {
if (!accessor) {
LOG(WARNING) << "Fail to add null tablet accessor";
return;
}
auto iter = name_idx_map_.find(accessor->GetName());
if (iter == name_idx_map_.cend()) {
accessors_.push_back(accessor);
name_idx_map_.insert(std::make_pair(accessor->GetName(), accessors_.size() - 1));
posinfos_.push_back(std::vector<size_t>({rows_cnt_}));
assign_accessor_idxs_.push_back(accessors_.size() - 1);
} else {
posinfos_[iter->second].push_back(rows_cnt_);
assign_accessor_idxs_.push_back(iter->second);
}
rows_cnt_++;
}

std::shared_ptr<hybridse::vm::RowHandler> TabletsAccessor::SubQuery(uint32_t task_id, const std::string& db,
const std::string& sql,
const hybridse::codec::Row& row,
const bool is_procedure, const bool is_debug) {
return std::make_shared<::hybridse::vm::ErrorRowHandler>(::hybridse::common::kRpcError,
"TabletsAccessor Unsupport SubQuery with request");
}

std::shared_ptr<hybridse::vm::TableHandler> TabletsAccessor::SubQuery(uint32_t task_id, const std::string& db,
const std::string& sql,
const std::set<size_t>& common_column_indices,
Expand All @@ -350,6 +370,7 @@ std::shared_ptr<hybridse::vm::TableHandler> TabletsAccessor::SubQuery(uint32_t t
}
return tables_handler;
}

PartitionClientManager::PartitionClientManager(uint32_t pid, const std::shared_ptr<TabletAccessor>& leader,
const std::vector<std::shared_ptr<TabletAccessor>>& followers)
: pid_(pid), leader_(leader), followers_(followers), rand_(0xdeadbeef) {}
Expand Down Expand Up @@ -406,6 +427,29 @@ TableClientManager::TableClientManager(const ::openmldb::storage::TableSt& table
}
}

void TableClientManager::Show() const {
DLOG(INFO) << "show client manager ";
for (size_t id = 0; id < partition_managers_.size(); id++) {
auto pmg = std::atomic_load_explicit(&partition_managers_[id], std::memory_order_relaxed);
if (pmg) {
if (pmg->GetLeader()) {
DLOG(INFO) << "partition managers (pid, leader) " << id << ", " << pmg->GetLeader()->GetName();
} else {
DLOG(INFO) << "partition managers (pid, leader) " << id << ", null leader";
}
} else {
DLOG(INFO) << "partition managers (pid, leader) " << id << ", null mamanger";
}
}
}

std::shared_ptr<PartitionClientManager> TableClientManager::GetPartitionClientManager(uint32_t pid) const {
if (pid < partition_managers_.size()) {
return std::atomic_load_explicit(&partition_managers_[pid], std::memory_order_relaxed);
}
return std::shared_ptr<PartitionClientManager>();
}

bool TableClientManager::UpdatePartitionClientManager(const ::openmldb::storage::PartitionSt& partition,
const ClientManager& client_manager) {
uint32_t pid = partition.GetPid();
Expand All @@ -429,6 +473,41 @@ bool TableClientManager::UpdatePartitionClientManager(const ::openmldb::storage:
return true;
}

std::shared_ptr<TabletAccessor> TableClientManager::GetTablet(uint32_t pid) const {
auto partition_manager = GetPartitionClientManager(pid);
if (partition_manager) {
return partition_manager->GetLeader();
}
return std::shared_ptr<TabletAccessor>();
}

std::vector<std::shared_ptr<TabletAccessor>> TableClientManager::GetTabletFollowers(uint32_t pid) const {
auto partition_manager = GetPartitionClientManager(pid);
if (partition_manager) {
return partition_manager->GetFollowers();
}
return {};
}

std::shared_ptr<TabletsAccessor> TableClientManager::GetTablet(std::vector<uint32_t> pids) const {
auto tablets_accessor = std::make_shared<TabletsAccessor>();
for (size_t idx = 0; idx < pids.size(); idx++) {
auto partition_manager = GetPartitionClientManager(pids[idx]);
if (partition_manager) {
auto leader = partition_manager->GetLeader();
if (!leader) {
LOG(WARNING) << "fail to get TabletsAccessor, null tablet for pid " << pids[idx];
return std::shared_ptr<TabletsAccessor>();
}
tablets_accessor->AddTabletAccessor(partition_manager->GetLeader());
} else {
LOG(WARNING) << "fail to get tablet: pid " << pids[idx] << " not exist";
return std::shared_ptr<TabletsAccessor>();
}
}
return tablets_accessor;
}

std::shared_ptr<TabletAccessor> ClientManager::GetTablet(const std::string& name) const {
std::lock_guard<::openmldb::base::SpinMutex> lock(mu_);
auto it = clients_.find(name);
Expand Down
Loading

0 comments on commit 8f19aeb

Please sign in to comment.