diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index e63377460e9..c94ffe51fa0 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -10,10 +10,14 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -KVStore::KVStore(const std::string & data_dir, Context *, std::vector * regions_to_remove) : region_persister(data_dir), log(&Logger::get("KVStore")) +KVStore::KVStore(const std::string & data_dir) : region_persister(data_dir), log(&Logger::get("KVStore")) +{ +} + +void KVStore::restore(const Region::RegionClientCreateFunc & region_client_create, std::vector * regions_to_remove) { std::lock_guard lock(mutex); - region_persister.restore(regions); + region_persister.restore(regions, region_client_create); // Remove regions which pending_remove = true, those regions still exist because progress crash after persisted and before removal. if (regions_to_remove != nullptr) diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 61eceee3a30..fb1c11091b2 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -26,7 +26,9 @@ static const Seconds KVSTORE_TRY_PERSIST_PERIOD(20); // 20 seconds class KVStore final : private boost::noncopyable { public: - KVStore(const std::string & data_dir, Context * context = nullptr, std::vector * regions_to_remove = nullptr); + KVStore(const std::string & data_dir); + void restore(const Region::RegionClientCreateFunc & region_client_create, std::vector * regions_to_remove = nullptr); + RegionPtr getRegion(RegionID region_id); void traverseRegions(std::function callback); diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index f1b5b0aa970..6714fe1f547 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -186,7 +186,7 @@ RegionPtr Region::splitInto(const RegionMeta & meta) RegionPtr new_region; if (client != nullptr) new_region = std::make_shared( - meta, std::make_shared(client->cache, client->client, meta.getRegionVerID())); + meta, [&](pingcap::kv::RegionVerID) { return std::make_shared(client->cache, client->client, meta.getRegionVerID()); }); else new_region = std::make_shared(meta); @@ -460,14 +460,14 @@ size_t Region::serialize(WriteBuffer & buf) return total_size; } -RegionPtr Region::deserialize(ReadBuffer & buf) +RegionPtr Region::deserialize(ReadBuffer & buf, const RegionClientCreateFunc & region_client_create) { auto version = readBinary2(buf); if (version != Region::CURRENT_VERSION) throw Exception("Unexpected region version: " + DB::toString(version) + ", expected: " + DB::toString(CURRENT_VERSION), ErrorCodes::UNKNOWN_FORMAT_VERSION); - auto region = std::make_shared(RegionMeta::deserialize(buf)); + auto region = std::make_shared(RegionMeta::deserialize(buf), region_client_create); auto size = readBinary2(buf); for (size_t i = 0; i < size; ++i) diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 42327acd268..50afc7d2e24 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -155,12 +155,14 @@ class Region : public std::enable_shared_from_this explicit Region(const RegionMeta & meta_) : meta(meta_), client(nullptr), log(&Logger::get("Region")) {} - explicit Region(RegionMeta && meta_, pingcap::kv::RegionClientPtr client_) - : meta(std::move(meta_)), client(client_), log(&Logger::get("Region")) + using RegionClientCreateFunc = std::function; + + explicit Region(RegionMeta && meta_, const RegionClientCreateFunc & region_client_create) + : meta(std::move(meta_)), client(region_client_create(meta.getRegionVerID ())), log(&Logger::get("Region")) {} - explicit Region(const RegionMeta & meta_, const pingcap::kv::RegionClientPtr & client_) - : meta(meta_), client(client_), log(&Logger::get("Region")) + explicit Region(const RegionMeta & meta_, const RegionClientCreateFunc & region_client_create) + : meta(meta_), client(region_client_create(meta.getRegionVerID())), log(&Logger::get("Region")) {} TableID insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value); @@ -174,7 +176,7 @@ class Region : public std::enable_shared_from_this std::unique_ptr createCommittedScanRemover(TableID expected_table_id); size_t serialize(WriteBuffer & buf); - static RegionPtr deserialize(ReadBuffer & buf); + static RegionPtr deserialize(ReadBuffer & buf, const RegionClientCreateFunc & region_client_create); void calculateCfCrc32(Crc32 & crc32) const; diff --git a/dbms/src/Storages/Transaction/RegionFile.cpp b/dbms/src/Storages/Transaction/RegionFile.cpp index 3d02bc2c363..54920f40eaf 100644 --- a/dbms/src/Storages/Transaction/RegionFile.cpp +++ b/dbms/src/Storages/Transaction/RegionFile.cpp @@ -87,10 +87,10 @@ RegionID RegionFile::Reader::hasNext() return next_region_meta->region_id; } -RegionPtr RegionFile::Reader::next() +RegionPtr RegionFile::Reader::next(const Region::RegionClientCreateFunc & region_create_func) { next_region_offset += next_region_meta->region_size; - return Region::deserialize(data_file_buf); + return Region::deserialize(data_file_buf, region_create_func); } void RegionFile::Reader::skipNext() diff --git a/dbms/src/Storages/Transaction/RegionFile.h b/dbms/src/Storages/Transaction/RegionFile.h index 5ed83e4acb9..a560c87d1e1 100644 --- a/dbms/src/Storages/Transaction/RegionFile.h +++ b/dbms/src/Storages/Transaction/RegionFile.h @@ -65,7 +65,7 @@ class RegionFile void checkHash(const std::vector & use); RegionID hasNext(); - RegionPtr next(); + RegionPtr next(const Region::RegionClientCreateFunc & region_create_func); void skipNext(); const std::vector & regionMetas() { return metas; } diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index eabf83f777c..24caf25b1ce 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -121,7 +121,7 @@ std::vector valid_regions_in_file(std::vector persist_lock(persist_mutex); std::lock_guard map_lock(region_map_mutex); @@ -171,7 +171,7 @@ void RegionPersister::restore(RegionMap & regions) { if (use[index]) { - regions.emplace(region_id, reader.next()); + regions.emplace(region_id, reader.next(region_client_create)); file->addRegion(region_id, metas[index].region_size); } else @@ -251,7 +251,7 @@ bool RegionPersister::gc() { if (use[index] && migrate_region_ids.count(region_id)) { - auto region = reader.next(); + auto region = reader.next([](pingcap::kv::RegionVerID) -> pingcap::kv::RegionClientPtr { return nullptr; }); auto region_size = gc_file_writer.write(region); { std::lock_guard map_lock(region_map_mutex); diff --git a/dbms/src/Storages/Transaction/RegionPersister.h b/dbms/src/Storages/Transaction/RegionPersister.h index f3f34a54ab5..37ef63e59a2 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.h +++ b/dbms/src/Storages/Transaction/RegionPersister.h @@ -58,7 +58,7 @@ class RegionPersister final : private boost::noncopyable void drop(UInt64 region_id); void persist(const RegionPtr & region); - void restore(RegionMap &); + void restore(RegionMap &, const Region::RegionClientCreateFunc &); bool gc(); // For tests. diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index 7add404f7ce..5ec91fbfe5e 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -7,13 +7,17 @@ namespace DB { TMTContext::TMTContext(Context & context, std::vector addrs) - : kvstore(std::make_shared(context.getPath() + "kvstore/", &context, ®ions_to_remove)), + : kvstore(std::make_shared(context.getPath() + "kvstore/")), region_table(context, context.getPath() + "regmap/", std::bind(&KVStore::getRegion, kvstore.get(), std::placeholders::_1)), schema_syncer(std::make_shared()), pd_client(addrs.size() == 0 ? static_cast(new pingcap::pd::MockPDClient()) : static_cast(new pingcap::pd::Client(addrs))), - region_cache(std::make_shared(pd_client)) + region_cache(std::make_shared(pd_client)), + rpc_client(std::make_shared()) { + kvstore->restore([&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr { + return this->createRegionClient(id); + }, ®ions_to_remove); for (RegionID id : regions_to_remove) kvstore->removeRegion(id, &context); regions_to_remove.clear(); @@ -44,12 +48,16 @@ void TMTContext::setPDClient(pingcap::pd::ClientPtr rhs) pd_client = rhs; } +pingcap::kv::RegionClientPtr TMTContext::createRegionClient(pingcap::kv::RegionVerID region_version_id) const +{ + std::lock_guard lock(mutex); + return pd_client->isMock() ? nullptr : std::make_shared(region_cache, rpc_client, region_version_id); +} + pingcap::kv::RegionCachePtr TMTContext::getRegionCache() const { return region_cache; } pingcap::kv::RpcClientPtr TMTContext::getRpcClient() { - if (rpc_client == nullptr) - rpc_client = std::make_shared(); return rpc_client; } diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index c4e363ac7b9..db5724d6d61 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -30,6 +30,8 @@ class TMTContext pingcap::pd::ClientPtr getPDClient() const; void setPDClient(pingcap::pd::ClientPtr); + pingcap::kv::RegionClientPtr createRegionClient(pingcap::kv::RegionVerID region_version_id) const; + pingcap::kv::RegionCachePtr getRegionCache() const; pingcap::kv::RpcClientPtr getRpcClient(); diff --git a/dbms/src/Storages/Transaction/applySnapshot.cpp b/dbms/src/Storages/Transaction/applySnapshot.cpp index de7bb419f5a..15d433f05f2 100644 --- a/dbms/src/Storages/Transaction/applySnapshot.cpp +++ b/dbms/src/Storages/Transaction/applySnapshot.cpp @@ -20,15 +20,15 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context) const auto & state = request.state(); pingcap::kv::RegionClientPtr region_client = nullptr; auto meta = RegionMeta(state.peer(), state.region(), state.apply_state()); - if (context) { - auto & tmt_ctx = context->getTMTContext(); - auto pd_client = tmt_ctx.getPDClient(); - if (!pd_client->isMock()) { - auto region_cache = tmt_ctx.getRegionCache(); - region_client = std::make_shared(region_cache, tmt_ctx.getRpcClient(), meta.getRegionVerID()); + Region::RegionClientCreateFunc region_client_create = [&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr { + if (context) + { + auto & tmt_ctx = context->getTMTContext(); + return tmt_ctx.createRegionClient(id); } - } - auto region = std::make_shared(meta, region_client); + return nullptr; + }; + auto region = std::make_shared(meta, region_client_create); LOG_INFO(log, "Region " << region->id() << " apply snapshot " << region->toString(true));