Skip to content

Commit

Permalink
Integrate some functions in KVStore and rename them to what they actu…
Browse files Browse the repository at this point in the history
…ally do (pingcap#7402)

ref pingcap#7256
  • Loading branch information
CalvinNeo committed Jul 3, 2023
1 parent 3d1893f commit 56eafc8
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 42 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void dbgFuncTryFlushRegion(Context & context, const ASTs & args, DBGInvoker::Pri
auto region_id = static_cast<RegionID>(safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value));

TMTContext & tmt = context.getTMTContext();
tmt.getRegionTable().tryFlushRegion(region_id);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region_id);

output(fmt::format("region_table try flush region {}", region_id));
}
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region,
// engine may delete data unsafely.
auto region_lock = region_manager.genRegionTaskLock(old_region->id());
old_region->setStateApplying();
tmt.getRegionTable().tryFlushRegion(old_region, false);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(old_region, false);
tryFlushRegionCacheInStorage(tmt, *old_region, log);
persistRegion(*old_region, region_lock, "save previous region before apply");
persistRegion(*old_region, &region_lock, "save previous region before apply");
}
}

Expand Down Expand Up @@ -209,7 +209,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re
{
try
{
auto tmp = region_table.tryFlushRegion(new_region_wrap, false);
auto tmp = region_table.tryWriteBlockByRegionAndFlush(new_region_wrap, false);
{
std::lock_guard lock(bg_gc_region_data_mutex);
bg_gc_region_data.push_back(std::move(tmp));
Expand Down Expand Up @@ -261,7 +261,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re
manage_lock.index.add(new_region);
}

persistRegion(*new_region, region_lock, "save current region after apply");
persistRegion(*new_region, &region_lock, "save current region after apply");

tmt.getRegionTable().shrinkRegionRange(*new_region);
}
Expand Down Expand Up @@ -506,7 +506,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec
return;
try
{
tmt.getRegionTable().tryFlushRegion(region, false);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region, false);
tryFlushRegionCacheInStorage(tmt, *region, log);
}
catch (Exception & e)
Expand All @@ -533,7 +533,7 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec
}
else
{
persistRegion(*region, region_task_lock, __FUNCTION__);
persistRegion(*region, &region_task_lock, __FUNCTION__);
return EngineStoreApplyRes::Persist;
}
}
Expand Down
32 changes: 19 additions & 13 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,12 @@ bool KVStore::tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & regi
return true;
}

void KVStore::tryPersist(RegionID region_id)
void KVStore::tryPersistRegion(RegionID region_id)
{
auto region = getRegion(region_id);
if (region)
{
LOG_INFO(log, "Try to persist {}", region->toString(false));
RUNTIME_CHECK_MSG(region_persister, "try access to region_persister without initialization, stack={}", StackTrace().toString());
region_persister->persist(*region);
LOG_INFO(log, "After persisted {}, cache {} bytes", region->toString(false), region->dataSize());
persistRegion(*region, std::nullopt, "");
}
}

Expand Down Expand Up @@ -333,12 +330,21 @@ void KVStore::setRegionCompactLogConfig(UInt64 sec, UInt64 rows, UInt64 bytes)
bytes);
}

void KVStore::persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller)
void KVStore::persistRegion(const Region & region, std::optional<const RegionTaskLock *> region_task_lock, const char * caller)
{
LOG_INFO(log, "Start to persist {}, cache size: {} bytes for `{}`", region.toString(true), region.dataSize(), caller);
RUNTIME_CHECK_MSG(region_persister, "try access to region_persister without initialization, stack={}", StackTrace().toString());
region_persister->persist(region, region_task_lock);
LOG_DEBUG(log, "Persist {} done", region.toString(false));
if (region_task_lock.has_value())
{
LOG_INFO(log, "Start to persist {}, cache size: {} bytes for `{}`", region.toString(true), region.dataSize(), caller);
region_persister->persist(region, *region_task_lock.value());
LOG_DEBUG(log, "Persist {} done", region.toString(false));
}
else
{
LOG_INFO(log, "Try to persist {}", region.toString(false));
region_persister->persist(region);
LOG_INFO(log, "After persisted {}, cache {} bytes", region.toString(false), region.dataSize());
}
}

bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt)
Expand Down Expand Up @@ -422,7 +428,7 @@ bool KVStore::forceFlushRegionDataImpl(Region & curr_region, bool try_until_succ
}
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, try_until_succeed))
{
persistRegion(curr_region, region_task_lock, "tryFlushRegionData");
persistRegion(curr_region, &region_task_lock, "tryFlushRegionData");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_flush_region).Observe(watch.elapsedSeconds());
Expand Down Expand Up @@ -474,7 +480,7 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
|| cmd_type == raft_cmdpb::AdminCmdType::BatchSwitchWitness)
{
tryFlushRegionCacheInStorage(tmt, curr_region, log);
persistRegion(curr_region, region_task_lock, fmt::format("admin cmd useless {}", cmd_type).c_str());
persistRegion(curr_region, &region_task_lock, fmt::format("admin cmd useless {}", cmd_type).c_str());
return EngineStoreApplyRes::Persist;
}
return EngineStoreApplyRes::None;
Expand Down Expand Up @@ -539,7 +545,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ
const auto try_to_flush_region = [&tmt](const RegionPtr & region) {
try
{
tmt.getRegionTable().tryFlushRegion(region, false);
tmt.getRegionTable().tryWriteBlockByRegionAndFlush(region, false);
}
catch (...)
{
Expand All @@ -549,7 +555,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ

const auto persist_and_sync = [&](const Region & region) {
tryFlushRegionCacheInStorage(tmt, region, log);
persistRegion(region, region_task_lock, "admin raft cmd");
persistRegion(region, &region_task_lock, "admin raft cmd");
};

const auto handle_batch_split = [&](Regions & split_regions) {
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class KVStore final : private boost::noncopyable

void gcRegionPersistedCache(Seconds gc_persist_period = Seconds(60 * 5));

void tryPersist(RegionID region_id);
void tryPersistRegion(RegionID region_id);

static bool tryFlushRegionCacheInStorage(TMTContext & tmt, const Region & region, const LoggerPtr & log, bool try_until_succeed = true);

Expand Down Expand Up @@ -246,7 +246,7 @@ class KVStore final : private boost::noncopyable
bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term);
bool forceFlushRegionDataImpl(Region & curr_region, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term);

void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller);
void persistRegion(const Region & region, std::optional<const RegionTaskLock *> region_task_lock, const char * caller);
void releaseReadIndexWorkers();
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);

Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ bool RegionTable::shouldFlush(const InternalRegion & region) const
return false;
}

RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & region, bool try_persist) const
RegionDataReadInfoList RegionTable::writeBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist) const
{
auto & tmt = context->getTMTContext();

Expand All @@ -140,7 +140,7 @@ RegionDataReadInfoList RegionTable::flushRegion(const RegionPtrWithBlock & regio
if (try_persist)
{
KVStore::tryFlushRegionCacheInStorage(tmt, *region, log);
tmt.getKVStore()->tryPersist(region->id());
tmt.getKVStore()->tryPersistRegion(region->id());
}
}

Expand Down Expand Up @@ -302,7 +302,7 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const
}
}

RegionDataReadInfoList RegionTable::tryFlushRegion(RegionID region_id, bool try_persist)
RegionDataReadInfoList RegionTable::tryWriteBlockByRegionAndFlush(RegionID region_id, bool try_persist)
{
auto region = context->getTMTContext().getKVStore()->getRegion(region_id);
if (!region)
Expand All @@ -311,10 +311,10 @@ RegionDataReadInfoList RegionTable::tryFlushRegion(RegionID region_id, bool try_
return {};
}

return tryFlushRegion(region, try_persist);
return tryWriteBlockByRegionAndFlush(region, try_persist);
}

RegionDataReadInfoList RegionTable::tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist)
RegionDataReadInfoList RegionTable::tryWriteBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist)
{
RegionID region_id = region->id();

Expand Down Expand Up @@ -349,7 +349,7 @@ RegionDataReadInfoList RegionTable::tryFlushRegion(const RegionPtrWithBlock & re
RegionDataReadInfoList data_list_to_remove;
try
{
data_list_to_remove = flushRegion(region, try_persist);
data_list_to_remove = writeBlockByRegionAndFlush(region, try_persist);
}
catch (const Exception & e)
{
Expand Down Expand Up @@ -414,7 +414,7 @@ bool RegionTable::tryFlushRegions()
{
if (RegionID region_to_flush = pickRegionToFlush(); region_to_flush != InvalidRegionID)
{
tryFlushRegion(region_to_flush, true);
tryWriteBlockByRegionAndFlush(region_to_flush, true);
return true;
}

Expand Down
16 changes: 12 additions & 4 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,15 @@ class RegionTable : private boost::noncopyable

void removeRegion(RegionID region_id, bool remove_data, const RegionTaskLock &);

// Find all regions with data, call writeBlockByRegionAndFlush with try_persist = true.
// This function is only for debug.
bool tryFlushRegions();
RegionDataReadInfoList tryFlushRegion(RegionID region_id, bool try_persist = false);
RegionDataReadInfoList tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist);

// Protects writeBlockByRegionAndFlush and ensures it's executed by only one thread at the smae time.
// Only one thread can do this at the same time.
// The original name for this function is tryFlushRegion.
RegionDataReadInfoList tryWriteBlockByRegionAndFlush(RegionID region_id, bool try_persist = false);
RegionDataReadInfoList tryWriteBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist);

void handleInternalRegionsByTable(KeyspaceID keyspace_id, TableID table_id, std::function<void(const InternalRegions &)> && callback) const;
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(KeyspaceID keyspace_id, TableID table_id) const;
Expand Down Expand Up @@ -191,7 +197,6 @@ class RegionTable : private boost::noncopyable
/// extend range for possible InternalRegion or add one.
void extendRegionRange(RegionID region_id, const RegionRangeKeys & region_range_keys);


void updateSafeTS(UInt64 region_id, UInt64 leader_safe_ts, UInt64 self_safe_ts);

// unit: ms. If safe_ts diff is larger than 2min, we think the data synchronization progress is far behind the leader.
Expand All @@ -211,7 +216,10 @@ class RegionTable : private boost::noncopyable
InternalRegion & insertRegion(Table & table, const Region & region);
InternalRegion & doGetInternalRegion(KeyspaceTableID ks_tb_id, RegionID region_id);

RegionDataReadInfoList flushRegion(const RegionPtrWithBlock & region, bool try_persist) const;
// Try write the committed kvs into cache of columnar DeltaMergeStore.
// Flush the cache if try_persist is set to true.
// The original name for this method is flushRegion.
RegionDataReadInfoList writeBlockByRegionAndFlush(const RegionPtrWithBlock & region, bool try_persist) const;
bool shouldFlush(const InternalRegion & region) const;
RegionID pickRegionToFlush();

Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ TEST_F(RegionKVStoreTest, NewProxy)
}
}
{
kvs.tryPersist(1);
kvs.tryPersistRegion(1);
kvs.gcRegionPersistedCache(Seconds{0});
}
{
Expand Down Expand Up @@ -796,7 +796,7 @@ TEST_F(RegionKVStoreTest, KVStore)
}
}
{
kvs.tryPersist(1);
kvs.tryPersistRegion(1);
kvs.gcRegionPersistedCache(Seconds{0});
}
{
Expand Down Expand Up @@ -1329,9 +1329,9 @@ TEST_F(RegionKVStoreTest, KVStoreRestore)
lock.index.add(region);
}
}
kvs.tryPersist(1);
kvs.tryPersist(2);
kvs.tryPersist(3);
kvs.tryPersistRegion(1);
kvs.tryPersistRegion(2);
kvs.tryPersistRegion(3);
}
{
KVStore & kvs = reloadKVSFromDisk();
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Transaction/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1);
ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
const KVStore & kvs = reloadKVSFromDisk();
Expand Down Expand Up @@ -70,7 +70,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index);
ASSERT_EQ(kvr1->appliedIndex(), applied_index);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
KVStore & kvs = reloadKVSFromDisk();
Expand Down Expand Up @@ -103,7 +103,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index);
ASSERT_EQ(kvr1->appliedIndex(), applied_index);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
KVStore & kvs = reloadKVSFromDisk();
Expand Down Expand Up @@ -135,7 +135,7 @@ try
proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index);
ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index);
ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1);
kvs.tryPersist(region_id);
kvs.tryPersistRegion(region_id);
}
{
MockRaftStoreProxy::FailCond cond;
Expand Down

0 comments on commit 56eafc8

Please sign in to comment.