Skip to content

Commit

Permalink
[FLASH-306] Check and try optimize overlapped data marks (#280)
Browse files Browse the repository at this point in the history
* check and optimize table if proportion of overlapped data marks is bigger than threshold.

* fix handle RaftCommandResult::Type::IndexError.
  • Loading branch information
solotzg authored Oct 23, 2019
1 parent e8c8ccd commit daee3ab
Show file tree
Hide file tree
Showing 18 changed files with 309 additions and 83 deletions.
39 changes: 39 additions & 0 deletions dbms/src/DataStreams/PKColumnIterator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include <Storages/Transaction/TiKVHandle.h>

namespace DB
{
struct PKColumnIterator : public std::iterator<std::random_access_iterator_tag, UInt64, size_t>
{
PKColumnIterator & operator++()
{
++pos;
return *this;
}

PKColumnIterator & operator=(const PKColumnIterator & itr)
{
pos = itr.pos;
column = itr.column;
return *this;
}

UInt64 operator*() const { return column->getUInt(pos); }

size_t operator-(const PKColumnIterator & itr) const { return pos - itr.pos; }

PKColumnIterator(const int pos_, const IColumn * column_) : pos(pos_), column(column_) {}

void operator+=(size_t n) { pos += n; }

size_t pos;
const IColumn * column;
};

template<typename HandleType>
inline bool PkCmp(const UInt64 & a, const TiKVHandle::Handle<HandleType> & b)
{
return static_cast<HandleType>(a) < b;
}
}
30 changes: 2 additions & 28 deletions dbms/src/DataStreams/RangesFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Columns/ColumnsNumber.h>
#include <DataStreams/RangesFilterBlockInputStream.h>
#include <DataStreams/dedupUtils.h>
#include <DataStreams/PKColumnIterator.hpp>

namespace DB
{
Expand All @@ -10,37 +11,10 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

struct PKColumnIterator : public std::iterator<std::random_access_iterator_tag, UInt64, size_t>
{
PKColumnIterator & operator++()
{
++pos;
return *this;
}

PKColumnIterator & operator=(const PKColumnIterator & itr)
{
pos = itr.pos;
column = itr.column;
return *this;
}

UInt64 operator*() const { return column->getUInt(pos); }

size_t operator-(const PKColumnIterator & itr) const { return pos - itr.pos; }

PKColumnIterator(const int pos_, const IColumn * column_) : pos(pos_), column(column_) {}

void operator+=(size_t n) { pos += n; }

size_t pos;
const IColumn * column;
};

template <typename HandleType>
Block RangesFilterBlockInputStream<HandleType>::readImpl()
{
static const auto func_cmp = [](const UInt64 & a, const Handle & b) -> bool { return static_cast<HandleType>(a) < b; };
static const auto func_cmp = PkCmp<HandleType>;

while (true)
{
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Debug/ClusterManage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,15 @@ void ClusterManage::findRegionByRange(Context & context, const ASTs & args, Prin
}
}

void ClusterManage::checkTableOptimize(DB::Context & context, const DB::ASTs & args, DB::Printer)
{
if (args.size() < 3)
throw Exception("Args not matched, should be: table-id, threshold", ErrorCodes::BAD_ARGUMENTS);

auto & tmt = context.getTMTContext();
TableID table_id = (TableID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value);
auto a = typeid_cast<const ASTLiteral &>(*args[1]).value.safeGet<DecimalField<Decimal32>>();
tmt.getRegionTable().checkTableOptimize(table_id, a.getValue().toFloat<Float32>(a.getScale()));
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Debug/ClusterManage.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ struct ClusterManage
// Usage:
// ./storages-client.sh "DBGInvoke find_region_by_range(start_key, end_key)"
static void findRegionByRange(Context & context, const ASTs & args, Printer output);

static void checkTableOptimize(Context & context, const ASTs & args, Printer output);
};
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ DBGInvoker::DBGInvoker()
regSchemalessFunc("dump_all_mock_region", dbgFuncDumpAllMockRegion);
regSchemalessFunc("dump_region_table", ClusterManage::dumpRegionTable);
regSchemalessFunc("find_region_by_range", ClusterManage::findRegionByRange);
regSchemalessFunc("check_table_optimize", ClusterManage::checkTableOptimize);

regSchemalessFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regSchemalessFunc("refresh_schemas", dbgFuncRefreshSchemas);
Expand Down
20 changes: 15 additions & 5 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,24 @@ RaftService::RaftService(DB::Context & db_context_)
if (!db_context.getTMTContext().isInitialized())
throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR);

persist_handle = background_pool.addTask([this] { return kvstore->tryPersist(); }, false);
single_thread_task_handle = background_pool.addTask(
[this] {
auto & tmt = db_context.getTMTContext();
{
RegionTable & region_table = tmt.getRegionTable();
region_table.checkTableOptimize();
}
kvstore->tryPersist();
return false;
},
false);

table_flush_handle = background_pool.addTask([this] {
auto & tmt = db_context.getTMTContext();
RegionTable & region_table = tmt.getRegionTable();

// if all regions of table is removed, try to optimize data.
if (auto table_id = region_table.popOneTableToClean(); table_id != InvalidTableID)
if (auto table_id = region_table.popOneTableToOptimize(); table_id != InvalidTableID)
{
LOG_INFO(log, "try to final optimize table " << table_id);
tryOptimizeStorageFinal(db_context, table_id);
Expand Down Expand Up @@ -93,10 +103,10 @@ void RaftService::addRegionToDecode(const RegionPtr & region)

RaftService::~RaftService()
{
if (persist_handle)
if (single_thread_task_handle)
{
background_pool.removeTask(persist_handle);
persist_handle = nullptr;
background_pool.removeTask(single_thread_task_handle);
single_thread_task_handle = nullptr;
}
if (table_flush_handle)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Raft/RaftService.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RaftService final : public enginepb::Engine::Service, public std::enable_s
std::queue<RegionID> regions_to_flush;
RegionMap regions_to_decode;

BackgroundProcessingPool::TaskHandle persist_handle;
BackgroundProcessingPool::TaskHandle single_thread_task_handle;
BackgroundProcessingPool::TaskHandle table_flush_handle;
BackgroundProcessingPool::TaskHandle region_flush_handle;
BackgroundProcessingPool::TaskHandle region_decode_handle;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
/// create TMTContext
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr);
global_context->getTMTContext().getRegionTable().setTableCheckerThreshold(config().getDouble("flash.overlap_threshold", 0.9));
}

/// Then, load remaining databases
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/CHTableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ inline std::tuple<int, std::array<UInt64TableHandleRange, 2>> splitForUInt64Tabl
throw Exception("splitForUInt64TableHandle should not happen", ErrorCodes::LOGICAL_ERROR);
}

template <typename HandleType>
inline void merge_ranges(std::vector<DB::HandleRange<HandleType>> & ranges)
template <typename Type>
inline void merge_ranges(std::vector<std::pair<Type, Type>> & ranges)
{
if (ranges.empty())
return;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
switch (result.type)
{
case RaftCommandResult::Type::IndexError:
report_sync_log();
persist_and_sync();
break;
case RaftCommandResult::Type::BatchSplit:
handle_batch_split(result.split_regions);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void RegionTable::writeBlockByRegion(
if (!scanner.hasNext())
return;

data_list_to_remove.reserve(scanner.writeMapSize());
data_list_read.reserve(scanner.writeMapSize());

auto start_time = Clock::now();
do
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ void Region::tryPreDecodeTiKVValue()
}

const RegionRangeKeys & RegionRaftCommandDelegate::getRange() { return *meta.makeRaftCommandDelegate().regionState().getRange(); }
UInt64 RegionRaftCommandDelegate::appliedIndex() { return meta.makeRaftCommandDelegate().getApplyState().applied_index(); }
UInt64 RegionRaftCommandDelegate::appliedIndex() { return meta.makeRaftCommandDelegate().applyState().applied_index(); }
metapb::Region Region::getMetaRegion() const { return meta.getMetaRegion(); }
raft_serverpb::MergeState Region::getMergeState() const { return meta.getMergeState(); }

Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Storages/Transaction/RegionDataMover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ void tryOptimizeStorageFinal(Context & context, TableID table_id)
if (!storage)
return;

auto merge_tree = std::dynamic_pointer_cast<StorageMergeTree>(storage);
auto table_lock = merge_tree->lockStructure(true, __PRETTY_FUNCTION__);

if (merge_tree->is_dropped)
return;
auto table_lock = storage->lockStructure(true, __PRETTY_FUNCTION__);

std::stringstream ss;
ss << "OPTIMIZE TABLE `" << storage->getDatabaseName() << "`.`" << storage->getTableName() << "` PARTITION ID '0' FINAL";
Expand Down
17 changes: 15 additions & 2 deletions dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void RegionPersister::computeRegionWriteBuffer(const Region & region, RegionCach

region_id = region.id();
std::tie(region_size, applied_index) = region.serialize(buffer);
if (unlikely(region_size > std::numeric_limits<UInt32>::max()))
if (unlikely(region_size > static_cast<size_t>(std::numeric_limits<UInt32>::max())))
{
LOG_ERROR(&Logger::get("RegionPersister"),
region.toString() << " with data info: " << region.dataInfo() << ", serialized size " << region_size
Expand All @@ -33,7 +33,20 @@ void RegionPersister::computeRegionWriteBuffer(const Region & region, RegionCach
}
}

void RegionPersister::persist(const Region & region, const RegionTaskLock & lock) { doPersist(region, &lock); }
void RegionPersister::persist(const Region & region, const RegionTaskLock & lock)
{
UInt64 applied_index = region.appliedIndex();
{
std::lock_guard<std::mutex> lock(mutex);
auto cache = page_storage.getCache(region.id());
if (cache.isValid() && cache.tag == applied_index)
{
LOG_DEBUG(log, region.toString(false) << " ignore persist because of same applied index " << applied_index);
return;
}
}
doPersist(region, &lock);
}

void RegionPersister::persist(const Region & region) { doPersist(region, nullptr); }

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,14 @@ void RegionTable::updateRegionForSplit(const Region & split_region, const Region
}
}

TableID RegionTable::popOneTableToClean()
TableID RegionTable::popOneTableToOptimize()
{
TableID res = InvalidTableID;
std::lock_guard<std::mutex> lock(mutex);
if (auto it = table_to_clean.begin(); it != table_to_clean.end())
if (auto it = table_to_optimize.begin(); it != table_to_optimize.end())
{
res = *it;
table_to_clean.erase(it);
table_to_optimize.erase(it);
}
return res;
}
Expand All @@ -330,7 +330,7 @@ void RegionTable::removeRegion(const RegionID region_id)
auto & table = getOrCreateTable(table_id);
table.regions.erase(region_id);
if (table.regions.empty())
table_to_clean.emplace(table_id);
table_to_optimize.emplace(table_id);
}
}
}
Expand Down
72 changes: 43 additions & 29 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,34 +116,6 @@ class RegionTable : private boost::noncopyable
}
};

private:
TableMap tables;
RegionInfoMap regions;
std::unordered_set<RegionID> dirty_regions;
std::unordered_set<TableID> table_to_clean;

FlushThresholds flush_thresholds;

Context * const context;

mutable std::mutex mutex;
Logger * log;

private:
Table & getOrCreateTable(const TableID table_id);

InternalRegion & insertRegion(Table & table, const Region & region);
InternalRegion & getOrInsertRegion(TableID table_id, const Region & region);
InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, const RegionID region_id);

bool shouldFlush(const InternalRegion & region) const;

void flushRegion(TableID table_id, RegionID region_id, const bool try_persist = true) const;

void doShrinkRegionRange(const Region & region);
void doUpdateRegion(const Region & region, TableID table_id);

public:
RegionTable(Context & context_);
void restore();

Expand All @@ -164,7 +136,7 @@ class RegionTable : private boost::noncopyable

void removeRegion(const RegionID region_id);

TableID popOneTableToClean();
TableID popOneTableToOptimize();

/// Try pick some regions and flush.
/// Note that flush is organized by partition. i.e. if a regions is selected to be flushed, all regions belong to its partition will also flushed.
Expand Down Expand Up @@ -200,6 +172,48 @@ class RegionTable : private boost::noncopyable

TableIDSet getAllMappedTables(const RegionID region_id) const;
void dumpRegionsByTable(const TableID table_id, size_t & count, InternalRegions * regions) const;

void checkTableOptimize();
void checkTableOptimize(TableID, const double);
void setTableCheckerThreshold(double);

private:
Table & getOrCreateTable(const TableID table_id);

InternalRegion & insertRegion(Table & table, const Region & region);
InternalRegion & getOrInsertRegion(TableID table_id, const Region & region);
InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, const RegionID region_id);

bool shouldFlush(const InternalRegion & region) const;

void flushRegion(TableID table_id, RegionID region_id, const bool try_persist = true) const;

void doShrinkRegionRange(const Region & region);
void doUpdateRegion(const Region & region, TableID table_id);

struct TableOptimizeChecker
{
std::mutex mutex;
bool is_checking = false;
double threshold = 1.0;
Timepoint last_check_time = Clock::now();
};

private:
TableMap tables;
RegionInfoMap regions;
std::unordered_set<RegionID> dirty_regions;
std::unordered_set<TableID> table_to_optimize;

FlushThresholds flush_thresholds;

Context * const context;

mutable std::mutex mutex;

mutable TableOptimizeChecker table_checker;

Logger * log;
};

using RegionPartitionPtr = std::shared_ptr<RegionTable>;
Expand Down
Loading

0 comments on commit daee3ab

Please sign in to comment.