diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 961d00f8162..bf5ca8f6d44 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -694,6 +694,7 @@ void MetaClient::getResponse(Request req, return; } else { LOG(ERROR) << "Send request to " << host << ", exceed retry limit"; + LOG(ERROR) << "RpcResponse exception: " << t.exception().what().c_str(); pro.setValue( Status::Error("RPC failure in MetaClient: %s", t.exception().what().c_str())); } @@ -3532,6 +3533,21 @@ folly::Future> MetaClient::killQuery( return future; } +folly::Future> MetaClient::getWorkerId(std::string ipAddr) { + cpp2::GetWorkerIdReq req; + req.host_ref() = std::move(ipAddr); + + folly::Promise> promise; + auto future = promise.getFuture(); + getResponse( + std::move(req), + [](auto client, auto request) { return client->future_getWorkerId(request); }, + [](cpp2::GetWorkerIdResp&& resp) -> int64_t { return std::move(resp).get_workerid(); }, + std::move(promise), + true); + return future; +} + folly::Future> MetaClient::download(const std::string& hdfsHost, int32_t hdfsPort, const std::string& hdfsPath, diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 38ae54353ff..b1b41f7df13 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -651,6 +651,8 @@ class MetaClient { folly::Future> ingest(GraphSpaceID spaceId); + folly::Future> getWorkerId(std::string ipAddr); + HostAddr getMetaLeader() { return leader_; } @@ -659,6 +661,10 @@ class MetaClient { return heartbeatTime_; } + std::string getLocalIp() { + return options_.localHost_.toString(); + } + protected: // Return true if load succeeded. bool loadData(); diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index a1a45aaff41..a3470a8baa0 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -27,3 +27,4 @@ nebula_add_subdirectory(utils) nebula_add_subdirectory(ssl) nebula_add_subdirectory(geo) nebula_add_subdirectory(memory) +nebula_add_subdirectory(id) diff --git a/src/common/expression/UUIDExpression.cpp b/src/common/expression/UUIDExpression.cpp index 02393a05c41..e9e51010196 100644 --- a/src/common/expression/UUIDExpression.cpp +++ b/src/common/expression/UUIDExpression.cpp @@ -10,36 +10,26 @@ namespace nebula { bool UUIDExpression::operator==(const Expression& rhs) const { - if (kind_ != rhs.kind()) { - return false; - } - - const auto& r = static_cast(rhs); - return field_ == r.field_; + return kind_ == rhs.kind(); } void UUIDExpression::writeTo(Encoder& encoder) const { // kind_ + UNUSED(encoder); encoder << kind_; - - // field_ - CHECK(!field_.empty()); - encoder << field_; } void UUIDExpression::resetFrom(Decoder& decoder) { - // Read field_ - field_ = decoder.readStr(); + UNUSED(decoder); } const Value& UUIDExpression::eval(ExpressionContext& ctx) { - // TODO UNUSED(ctx); return result_; } std::string UUIDExpression::toString() const { - return folly::stringPrintf("uuid(\"%s\")", field_.c_str()); + return folly::stringPrintf("uuid()"); } void UUIDExpression::accept(ExprVisitor* visitor) { diff --git a/src/common/expression/UUIDExpression.h b/src/common/expression/UUIDExpression.h index ed53879e02d..febcfc32d6a 100644 --- a/src/common/expression/UUIDExpression.h +++ b/src/common/expression/UUIDExpression.h @@ -14,8 +14,8 @@ class UUIDExpression final : public Expression { friend class Expression; public: - static UUIDExpression* make(ObjectPool* pool, const std::string& field = "") { - return pool->add(new UUIDExpression(pool, field)); + static UUIDExpression* make(ObjectPool* pool) { + return pool->add(new UUIDExpression(pool)); } bool operator==(const Expression& rhs) const override; @@ -27,18 +27,16 @@ class UUIDExpression final : public Expression { void accept(ExprVisitor* visitor) override; Expression* clone() const override { - return UUIDExpression::make(pool_, field_); + return UUIDExpression::make(pool_); } private: - explicit UUIDExpression(ObjectPool* pool, const std::string& field = "") - : Expression(pool, Kind::kUUID), field_(field) {} + explicit UUIDExpression(ObjectPool* pool) : Expression(pool, Kind::kUUID) {} void writeTo(Encoder& encoder) const override; void resetFrom(Decoder& decoder) override; private: - std::string field_; Value result_; }; diff --git a/src/common/expression/test/EncodeDecodeTest.cpp b/src/common/expression/test/EncodeDecodeTest.cpp index 090e63594ce..95f48cb7ea6 100644 --- a/src/common/expression/test/EncodeDecodeTest.cpp +++ b/src/common/expression/test/EncodeDecodeTest.cpp @@ -205,7 +205,7 @@ TEST(ExpressionEncodeDecode, TypeCastingExpression) { } TEST(ExpressionEncodeDecode, UUIDExpression) { - const auto& uuidEx = *UUIDExpression::make(&pool, "field"); + const auto& uuidEx = *UUIDExpression::make(&pool); std::string encoded = Expression::encode(uuidEx); auto decoded = Expression::decode(&pool, folly::StringPiece(encoded.data(), encoded.size())); EXPECT_EQ(uuidEx, *decoded); diff --git a/src/common/expression/test/ExpressionTest.cpp b/src/common/expression/test/ExpressionTest.cpp index 43cc6a34693..68f9417c374 100644 --- a/src/common/expression/test/ExpressionTest.cpp +++ b/src/common/expression/test/ExpressionTest.cpp @@ -142,7 +142,7 @@ TEST_F(ExpressionTest, TestExprClone) { auto fnCallExpr = FunctionCallExpression::make(&pool, "count", ArgumentList::make(&pool)); ASSERT_EQ(*fnCallExpr, *fnCallExpr->clone()); - auto uuidExpr = UUIDExpression::make(&pool, "hello"); + auto uuidExpr = UUIDExpression::make(&pool); ASSERT_EQ(*uuidExpr, *uuidExpr->clone()); auto subExpr = SubscriptExpression::make( diff --git a/src/common/id/CMakeLists.txt b/src/common/id/CMakeLists.txt new file mode 100644 index 00000000000..e60a8a76ebe --- /dev/null +++ b/src/common/id/CMakeLists.txt @@ -0,0 +1,11 @@ +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License, +# attached with Common Clause Condition 1.0, found in the LICENSES directory. + +nebula_add_library( + snowflake_obj OBJECT + Snowflake.cpp +) + +nebula_add_subdirectory(test) diff --git a/src/common/id/Snowflake.cpp b/src/common/id/Snowflake.cpp new file mode 100644 index 00000000000..021330009f9 --- /dev/null +++ b/src/common/id/Snowflake.cpp @@ -0,0 +1,64 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "common/id/Snowflake.h" + +namespace nebula { +void Snowflake::initWorkerId(meta::MetaClient* client) { + const std::string& ip = client->getLocalIp(); + auto result = client->getWorkerId(ip).get(); + if (!result.ok()) { + LOG(FATAL) << "Failed to get workerId from meta server"; + } + workerId_ = result.value(); + LOG(INFO) << "WorkerId init success: " << workerId_; +} + +int64_t Snowflake::getId() { + std::lock_guard guard(mutex_); + + int64_t timestamp = getTimestamp(); + if (timestamp < lastTimestamp_) { + LOG(ERROR) << "Clock back"; + return kFirstBitRevert & getIdByTs(timestamp); + } + + return getIdByTs(timestamp); +} + +// get snowflake id by timestamp +// update lastTimestamp_ or sequence_ +int64_t Snowflake::getIdByTs(int64_t timestamp) { + // if it is the same time, then the microsecond sequence + if (lastTimestamp_ == timestamp) { + sequence_ = (sequence_ + 1) & kMaxSequence; + // if the microsecond sequence overflow + if (sequence_ == 0) { + // block to the next millisecond, get the new timestamp + timestamp = nextTimestamp(); + } + } else { + sequence_ = 0; + } + lastTimestamp_ = timestamp; + return (timestamp - kStartStmp) << kTimestampLeft | workerId_ << kWorkerIdLeft | sequence_; +} + +int64_t Snowflake::getTimestamp() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); +} + +// wait for the next millisecond +int64_t Snowflake::nextTimestamp() { + int64_t timestamp = getTimestamp(); + while (timestamp <= lastTimestamp_) { + timestamp = getTimestamp(); + } + return timestamp; +} + +} // namespace nebula diff --git a/src/common/id/Snowflake.h b/src/common/id/Snowflake.h new file mode 100644 index 00000000000..ae45f2e5a93 --- /dev/null +++ b/src/common/id/Snowflake.h @@ -0,0 +1,60 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef COMMON_ID_SNOWFLAKE_H_ +#define COMMON_ID_SNOWFLAKE_H_ + +#include "clients/meta/MetaClient.h" +#include "common/base/Base.h" + +namespace nebula { +class Snowflake { + FRIEND_TEST(SnowflakeTest, TestWorkerId); + FRIEND_TEST(SnowflakeTest, TestConcurrency); + friend int64_t getSequence(int64_t id); + friend int64_t getWorkerId(int64_t id); + friend int64_t getTimestamp(int64_t id); + + public: + Snowflake() = default; + + static void initWorkerId(meta::MetaClient* client); + + int64_t getId(); + + private: + static int64_t getTimestamp(); + + int64_t nextTimestamp(); + + int64_t getIdByTs(int64_t timestamp); + + std::mutex mutex_; + /* + * Snowflake id: + * timestampBit 38 bits (8.6 years) | + * workerBit 13 bits (8k workerid) | + * sequenceBit 12 bits (4 million per second) | + */ + int64_t lastTimestamp_{-1}; + static inline int64_t workerId_{0}; + int64_t sequence_{0}; + + static constexpr int64_t kStartStmp = 1577808000000; // start time: 2020-01-01 00:00:00 + static constexpr int64_t kWorkerBit = 13; // worker id bit + static constexpr int64_t kSequenceBit = 12; // sequence bit + + static constexpr int64_t kMaxWorkerId = (1 << kWorkerBit) - 1; // as worker id mask + static constexpr int64_t kMaxSequence = (1 << kSequenceBit) - 1; + + static constexpr int64_t kWorkerIdLeft = kSequenceBit; // workerId left bits + static constexpr int64_t kTimestampLeft = kSequenceBit + kWorkerBit; + + static constexpr int64_t kFirstBitRevert = 0x9000000000000000; // revert the first bit +}; + +} // namespace nebula + +#endif // COMMON_ID_SNOWFLAKE_H_ diff --git a/src/common/id/test/CMakeLists.txt b/src/common/id/test/CMakeLists.txt new file mode 100644 index 00000000000..347cd19a373 --- /dev/null +++ b/src/common/id/test/CMakeLists.txt @@ -0,0 +1,80 @@ +# Copyright (c) 2021 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License, + +nebula_add_executable( + NAME snowflake_bm + SOURCES SnowflakeBenchmark.cpp + OBJECTS + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + LIBRARIES + follybenchmark + boost_regex + ${PROXYGEN_LIBRARIES} + ${THRIFT_LIBRARIES} +) + + +nebula_add_test( + NAME snowflake_test + SOURCES SnowflakeTest.cpp + OBJECTS + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + LIBRARIES + gtest + gtest_main + ${PROXYGEN_LIBRARIES} + ${THRIFT_LIBRARIES} +) diff --git a/src/common/id/test/SnowflakeBenchmark.cpp b/src/common/id/test/SnowflakeBenchmark.cpp new file mode 100644 index 00000000000..976cec1c3ee --- /dev/null +++ b/src/common/id/test/SnowflakeBenchmark.cpp @@ -0,0 +1,84 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include + +#include +#include + +#include "common/base/Base.h" +#include "common/id/Snowflake.h" + +size_t SnowflakeTest(size_t iters) { + constexpr size_t ops = 10UL; + + nebula::Snowflake generator; + auto i = 0UL; + while (i++ < ops * iters) { + int64_t id = generator.getId(); + folly::doNotOptimizeAway(id); + } + + return iters * ops; +} + +size_t SnowflakeCurrencyTest(size_t iters, int threadNum) { + constexpr size_t ops = 100000UL; + + nebula::Snowflake generator; + std::vector threads; + threads.reserve(threadNum); + + auto proc = [&iters, &generator]() { + auto n = iters * ops; + for (auto i = 0UL; i < n; i++) { + int64_t id = generator.getId(); + folly::doNotOptimizeAway(id); + } + }; + + for (int i = 0; i < threadNum; i++) { + threads.emplace_back(std::thread(proc)); + } + + for (int i = 0; i < threadNum; i++) { + threads[i].join(); + } + + return iters * ops * threadNum; +} + +BENCHMARK_NAMED_PARAM_MULTI(SnowflakeTest, 1UL) +BENCHMARK_DRAW_LINE(); + +BENCHMARK_NAMED_PARAM_MULTI(SnowflakeCurrencyTest, 1_thread, 1) +BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SnowflakeCurrencyTest, 2_thread, 2) +BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SnowflakeCurrencyTest, 4_thread, 4) +BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SnowflakeCurrencyTest, 8_thread, 8) +BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SnowflakeCurrencyTest, 16_thread, 16) +BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SnowflakeCurrencyTest, 32_thread, 32) +BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SnowflakeCurrencyTest, 64_thread, 64) + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + folly::runBenchmarks(); + return 0; +} + +/* Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +============================================================================ +nebula/src/common/id/test/SnowflakeBenchmark.cpprelative time/iter iters/s +============================================================================ +SnowflakeTest(1UL) 68.03ns 14.70M +---------------------------------------------------------------------------- +SnowflakeCurrencyTest(1_thread) 239.12ns 4.18M +SnowflakeCurrencyTest(2_thread) 99.77% 239.66ns 4.17M +SnowflakeCurrencyTest(4_thread) 98.65% 242.39ns 4.13M +SnowflakeCurrencyTest(8_thread) 90.01% 265.67ns 3.76M +SnowflakeCurrencyTest(16_thread) 68.23% 350.46ns 2.85M +SnowflakeCurrencyTest(32_thread) 60.05% 398.22ns 2.51M +SnowflakeCurrencyTest(64_thread) 52.51% 455.38ns 2.20M +============================================================================ +*/ diff --git a/src/common/id/test/SnowflakeTest.cpp b/src/common/id/test/SnowflakeTest.cpp new file mode 100644 index 00000000000..d1ca39c3642 --- /dev/null +++ b/src/common/id/test/SnowflakeTest.cpp @@ -0,0 +1,68 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include +#include +#include + +#include +#include + +#include "common/base/Logging.h" +#include "common/id/Snowflake.h" + +namespace nebula { + +int64_t getSequence(int64_t id) { + return id & Snowflake::kMaxSequence; +} +int64_t getWorkerId(int64_t id) { + return (id >> Snowflake::kSequenceBit) & Snowflake::kMaxWorkerId; +} +int64_t getTimestamp(int64_t id) { + return id >> (Snowflake::kSequenceBit + Snowflake::kWorkerBit); +} + +TEST(SnowflakeTest, TestWorkerId) { + int64_t maxWorkerId = 1023; + + Snowflake generator; + + for (int i = 0; i < maxWorkerId; i++) { + Snowflake::workerId_ = i; + int64_t id = generator.getId(); + + ASSERT_EQ(getWorkerId(id), i); + } +} + +TEST(SnowflakeTest, TestConcurrency) { + Snowflake::workerId_ = 0; + int threadNum = 16; + int times = 1000; + + Snowflake generator; + folly::ConcurrentHashMap map; + std::vector threads; + threads.reserve(threadNum); + + auto proc = [&]() { + for (int i = 0; i < times; i++) { + int64_t id = generator.getId(); + ASSERT_TRUE(map.find(id) == map.end()) << "id: " << id; + map.insert(id, 0); + } + }; + + for (int i = 0; i < threadNum; i++) { + threads.emplace_back(std::thread(proc)); + } + + for (int i = 0; i < threadNum; i++) { + threads[i].join(); + } +} + +} // namespace nebula diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 8d8d68e680f..1d476baf543 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -7,6 +7,7 @@ set(common_deps $ $ $ + $ $ $ $ diff --git a/src/graph/service/GraphServer.cpp b/src/graph/service/GraphServer.cpp index 25e6ee6cc5e..c9254d35295 100644 --- a/src/graph/service/GraphServer.cpp +++ b/src/graph/service/GraphServer.cpp @@ -7,6 +7,7 @@ #include #include +#include "common/id/Snowflake.h" #include "graph/service/GraphFlags.h" #include "graph/service/GraphService.h" namespace nebula { @@ -39,6 +40,8 @@ bool GraphServer::start() { return false; } + nebula::Snowflake::initWorkerId(interface->metaClient_.get()); + graphThread_ = std::make_unique([&] { thriftServer_->setPort(localHost_.port); thriftServer_->setInterface(std::move(interface)); diff --git a/src/graph/service/GraphService.h b/src/graph/service/GraphService.h index bcb38993325..8e41dda4e2e 100644 --- a/src/graph/service/GraphService.h +++ b/src/graph/service/GraphService.h @@ -51,12 +51,13 @@ class GraphService final : public cpp2::GraphServiceSvIf { folly::Future future_verifyClientVersion( const cpp2::VerifyClientVersionReq& req) override; + std::unique_ptr metaClient_; + private: bool auth(const std::string& username, const std::string& password); std::unique_ptr sessionManager_; std::unique_ptr queryEngine_; - std::unique_ptr metaClient_; }; } // namespace graph diff --git a/src/interface/common.thrift b/src/interface/common.thrift index e9895ebb26a..a7e623c13c1 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -470,6 +470,8 @@ enum ErrorCode { E_WRITE_WRITE_CONFLICT = -3073, E_CLIENT_SERVER_INCOMPATIBLE = -3061, + // get worker id + E_WORKER_ID_FAILED = -3062, E_UNKNOWN = -8000, } (cpp.enum_strict) diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 1ee7bbacb68..5ea37b17a81 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -506,6 +506,17 @@ struct GetPartsAllocResp { 4: optional map(cpp.template = "std::unordered_map") terms, } +// get workerid for snowflake +struct GetWorkerIdReq { + 1: binary host, +} + +struct GetWorkerIdResp { + 1: common.ErrorCode code, + 2: common.HostAddr leader, + 3: i64 workerid, +} + struct MultiPutReq { // segment is used to avoid conflict with system data. // it should be comprised of numbers and letters. @@ -1206,6 +1217,8 @@ service MetaService { GetPartsAllocResp getPartsAlloc(1: GetPartsAllocReq req); ListPartsResp listParts(1: ListPartsReq req); + GetWorkerIdResp getWorkerId(1: GetWorkerIdReq req); + ExecResp multiPut(1: MultiPutReq req); GetResp get(1: GetReq req); MultiGetResp multiGet(1: MultiGetReq req); diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 54873ae9c70..ee0025d35aa 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -27,6 +27,7 @@ nebula_add_library( processors/schema/ListEdgesProcessor.cpp processors/schema/DropEdgeProcessor.cpp processors/schema/SchemaUtil.cpp + processors/id/GetWorkerIdProcessor.cpp processors/index/CreateTagIndexProcessor.cpp processors/index/DropTagIndexProcessor.cpp processors/index/GetTagIndexProcessor.cpp diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 997d38ed894..1ab14cd1f29 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -20,6 +20,7 @@ #include "meta/processors/config/ListConfigsProcessor.h" #include "meta/processors/config/RegConfigProcessor.h" #include "meta/processors/config/SetConfigProcessor.h" +#include "meta/processors/id/GetWorkerIdProcessor.h" #include "meta/processors/index/CreateEdgeIndexProcessor.h" #include "meta/processors/index/CreateTagIndexProcessor.h" #include "meta/processors/index/DropEdgeIndexProcessor.h" @@ -568,5 +569,11 @@ folly::Future MetaServiceHandler::future_verifyCl RETURN_FUTURE(processor); } +folly::Future MetaServiceHandler::future_getWorkerId( + const cpp2::GetWorkerIdReq& req) { + auto* processor = GetWorkerIdProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 4991bf731bc..3d9f7c7df20 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -239,6 +239,8 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_verifyClientVersion( const cpp2::VerifyClientVersionReq& req) override; + folly::Future future_getWorkerId(const cpp2::GetWorkerIdReq& req) override; + private: kvstore::KVStore* kvstore_ = nullptr; ClusterID clusterId_{0}; diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h index 353fddacc85..a4a436dcda1 100644 --- a/src/meta/processors/Common.h +++ b/src/meta/processors/Common.h @@ -23,6 +23,7 @@ class LockUtils { GENERATE_LOCK(lastUpdateTime); GENERATE_LOCK(space); GENERATE_LOCK(id); + GENERATE_LOCK(workerId); GENERATE_LOCK(localId); GENERATE_LOCK(tag); GENERATE_LOCK(edge); diff --git a/src/meta/processors/id/GetWorkerIdProcessor.cpp b/src/meta/processors/id/GetWorkerIdProcessor.cpp new file mode 100644 index 00000000000..798240ab9a0 --- /dev/null +++ b/src/meta/processors/id/GetWorkerIdProcessor.cpp @@ -0,0 +1,53 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "meta/processors/id/GetWorkerIdProcessor.h" + +namespace nebula { +namespace meta { + +void GetWorkerIdProcessor::process(const cpp2::GetWorkerIdReq& req) { + const string& ipAddr = req.get_host(); + auto result = doGet(ipAddr); + if (nebula::ok(result)) { + int64_t workerId = std::stoi(std::move(nebula::value(result))); + + handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); + resp_.workerid_ref() = workerId; + onFinished(); + return; + } + + folly::SharedMutex::WriteHolder wHolder(LockUtils::workerIdLock()); + auto newResult = doGet(kIdKey); + if (!nebula::ok(newResult)) { + handleErrorCode(nebula::cpp2::ErrorCode::E_WORKER_ID_FAILED); + onFinished(); + return; + } + + int64_t workerId = std::stoi(std::move(nebula::value(newResult))); + // TODO: (jackwener) limit worker, add LOG ERROR + doPut(std::vector{{ipAddr, std::to_string(workerId + 1)}}); + + handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); + resp_.workerid_ref() = workerId; + onFinished(); +} + +void GetWorkerIdProcessor::doPut(std::vector data) { + folly::Baton baton; + kvstore_->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, + std::move(data), + [this, &baton](nebula::cpp2::ErrorCode code) { + this->handleErrorCode(code); + baton.post(); + }); + baton.wait(); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/id/GetWorkerIdProcessor.h b/src/meta/processors/id/GetWorkerIdProcessor.h new file mode 100644 index 00000000000..1b1433d81a2 --- /dev/null +++ b/src/meta/processors/id/GetWorkerIdProcessor.h @@ -0,0 +1,42 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef META_GETWORKERIDPROCESSOR_H_ +#define META_GETWORKERIDPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +class GetWorkerIdProcessor : public BaseProcessor { + public: + static GetWorkerIdProcessor* instance(kvstore::KVStore* kvstore) { + return new GetWorkerIdProcessor(kvstore); + } + + void process(const cpp2::GetWorkerIdReq& req); + + private: + explicit GetWorkerIdProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) { + // initialize worker id in kvstore just once + static bool once = [this]() { + std::vector data = {{kIdKey, "0"}}; + doPut(data); + return true; + }(); + UNUSED(once); + } + + void doPut(std::vector data); + + inline static const string kIdKey = "snowflake_worker_id"; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_GETWORKERIDPROCESSOR_H_ diff --git a/src/parser/parser.yy b/src/parser/parser.yy index a6b0dd93ff2..163cecf2e4f 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -27,7 +27,6 @@ #include "common/expression/ListComprehensionExpression.h" #include "common/expression/AggregateExpression.h" #include "common/function/FunctionManager.h" - #include "common/expression/ReduceExpression.h" #include "graph/util/ParserUtil.h" #include "graph/util/ExpressionUtils.h" @@ -686,6 +685,9 @@ expression | reduce_expression { $$ = $1; } + | uuid_expression { + $$ = $1; + } ; constant_expression @@ -1097,9 +1099,8 @@ function_call_expression ; uuid_expression - : KW_UUID L_PAREN STRING R_PAREN { - $$ = UUIDExpression::make(qctx->objPool(), *$3); - delete $3; + : KW_UUID L_PAREN R_PAREN { + $$ = UUIDExpression::make(qctx->objPool()); } ; diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index a40726b4525..89199e87359 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -1291,7 +1291,7 @@ TEST_F(ParserTest, FetchVertex) { ASSERT_TRUE(result.ok()) << result.status(); } { - std::string query = "FETCH PROP ON person uuid(\"Tom\")"; + std::string query = "FETCH PROP ON person uuid()"; auto result = parse(query); ASSERT_TRUE(result.ok()) << result.status(); } @@ -1744,7 +1744,7 @@ TEST_F(ParserTest, UnreservedKeywords) { ASSERT_TRUE(result.ok()) << result.status(); } { - std::string query = "GO FROM UUID(\"tom\") OVER guest WHERE $-.EMAIL"; + std::string query = "GO FROM UUID() OVER guest WHERE $-.EMAIL"; auto result = parse(query); ASSERT_TRUE(result.ok()) << result.status(); } @@ -1758,7 +1758,7 @@ TEST_F(ParserTest, UnreservedKeywords) { } { std::string query = - "GO FROM UUID(\"tom\") OVER like YIELD $$.tag1.EMAIL, like.users," + "GO FROM UUID() OVER like YIELD $$.tag1.EMAIL, like.users," "like._src, like._dst, like.type, $^.tag2.SPACE " "| ORDER BY $-.SPACE"; auto result = parse(query); @@ -1770,7 +1770,7 @@ TEST_F(ParserTest, UnreservedKeywords) { ASSERT_TRUE(result.ok()) << result.status(); } { - std::string query = "$var = GO FROM UUID(\"tom\") OVER like;GO FROM $var.SPACE OVER like"; + std::string query = "$var = GO FROM UUID() OVER like;GO FROM $var.SPACE OVER like"; auto result = parse(query); ASSERT_TRUE(result.ok()) << result.status(); }