Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snowflake id #3500

Merged
merged 26 commits into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ void MetaClient::getResponse(Request req,
return;
} else {
LOG(ERROR) << "Send request to " << host << ", exceed retry limit";
LOG(ERROR) << "RpcResponse exception: " << t.exception().what().c_str();
pro.setValue(
Status::Error("RPC failure in MetaClient: %s", t.exception().what().c_str()));
}
Expand Down Expand Up @@ -3501,6 +3502,21 @@ folly::Future<StatusOr<cpp2::ExecResp>> MetaClient::killQuery(
return future;
}

folly::Future<StatusOr<int64_t>> MetaClient::getWorkerId(std::string ipAddr) {
cpp2::GetWorkerIdReq req;
req.host_ref() = std::move(ipAddr);

folly::Promise<StatusOr<int64_t>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_getWorkerId(request); },
[](cpp2::GetWorkerIdResp&& resp) -> int64_t { return std::move(resp).get_workerid(); },
std::move(promise),
true);
return future;
}

folly::Future<StatusOr<bool>> MetaClient::download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
Expand Down
6 changes: 6 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ class MetaClient {

folly::Future<StatusOr<bool>> ingest(GraphSpaceID spaceId);

folly::Future<StatusOr<int64_t>> getWorkerId(std::string ipAddr);

HostAddr getMetaLeader() {
return leader_;
}
Expand All @@ -652,6 +654,10 @@ class MetaClient {
return heartbeatTime_;
}

std::string getLocalIp() {
return options_.localHost_.toString();
}

protected:
// Return true if load succeeded.
bool loadData();
Expand Down
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ nebula_add_subdirectory(utils)
nebula_add_subdirectory(ssl)
nebula_add_subdirectory(geo)
nebula_add_subdirectory(memory)
nebula_add_subdirectory(id)
18 changes: 4 additions & 14 deletions src/common/expression/UUIDExpression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,26 @@
namespace nebula {

bool UUIDExpression::operator==(const Expression& rhs) const {
if (kind_ != rhs.kind()) {
return false;
}

const auto& r = static_cast<const UUIDExpression&>(rhs);
return field_ == r.field_;
return kind_ == rhs.kind();
}

void UUIDExpression::writeTo(Encoder& encoder) const {
// kind_
UNUSED(encoder);
encoder << kind_;

// field_
CHECK(!field_.empty());
encoder << field_;
}

void UUIDExpression::resetFrom(Decoder& decoder) {
// Read field_
field_ = decoder.readStr();
UNUSED(decoder);
}

const Value& UUIDExpression::eval(ExpressionContext& ctx) {
// TODO
UNUSED(ctx);
return result_;
}

std::string UUIDExpression::toString() const {
return folly::stringPrintf("uuid(\"%s\")", field_.c_str());
return folly::stringPrintf("uuid()");
}

void UUIDExpression::accept(ExprVisitor* visitor) {
Expand Down
10 changes: 4 additions & 6 deletions src/common/expression/UUIDExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class UUIDExpression final : public Expression {
friend class Expression;

public:
static UUIDExpression* make(ObjectPool* pool, const std::string& field = "") {
return pool->add(new UUIDExpression(pool, field));
static UUIDExpression* make(ObjectPool* pool) {
return pool->add(new UUIDExpression(pool));
}

bool operator==(const Expression& rhs) const override;
Expand All @@ -27,18 +27,16 @@ class UUIDExpression final : public Expression {
void accept(ExprVisitor* visitor) override;

Expression* clone() const override {
return UUIDExpression::make(pool_, field_);
return UUIDExpression::make(pool_);
}

private:
explicit UUIDExpression(ObjectPool* pool, const std::string& field = "")
: Expression(pool, Kind::kUUID), field_(field) {}
explicit UUIDExpression(ObjectPool* pool) : Expression(pool, Kind::kUUID) {}

void writeTo(Encoder& encoder) const override;
void resetFrom(Decoder& decoder) override;

private:
std::string field_;
Value result_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/common/expression/test/EncodeDecodeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ TEST(ExpressionEncodeDecode, TypeCastingExpression) {
}

TEST(ExpressionEncodeDecode, UUIDExpression) {
const auto& uuidEx = *UUIDExpression::make(&pool, "field");
const auto& uuidEx = *UUIDExpression::make(&pool);
std::string encoded = Expression::encode(uuidEx);
auto decoded = Expression::decode(&pool, folly::StringPiece(encoded.data(), encoded.size()));
EXPECT_EQ(uuidEx, *decoded);
Expand Down
2 changes: 1 addition & 1 deletion src/common/expression/test/ExpressionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ TEST_F(ExpressionTest, TestExprClone) {
auto fnCallExpr = FunctionCallExpression::make(&pool, "count", ArgumentList::make(&pool));
ASSERT_EQ(*fnCallExpr, *fnCallExpr->clone());

auto uuidExpr = UUIDExpression::make(&pool, "hello");
auto uuidExpr = UUIDExpression::make(&pool);
ASSERT_EQ(*uuidExpr, *uuidExpr->clone());

auto subExpr = SubscriptExpression::make(
Expand Down
11 changes: 11 additions & 0 deletions src/common/id/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) 2021 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License,
# attached with Common Clause Condition 1.0, found in the LICENSES directory.

nebula_add_library(
snowflake_obj OBJECT
Snowflake.cpp
)

nebula_add_subdirectory(test)
64 changes: 64 additions & 0 deletions src/common/id/Snowflake.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "common/id/Snowflake.h"

namespace nebula {
void Snowflake::initWorkerId(meta::MetaClient* client) {
const std::string& ip = client->getLocalIp();
auto result = client->getWorkerId(ip).get();
if (!result.ok()) {
LOG(FATAL) << "Failed to get workerId from meta server";
}
workerId_ = result.value();
LOG(INFO) << "WorkerId init success: " << workerId_;
}

int64_t Snowflake::getId() {
std::lock_guard<std::mutex> guard(mutex_);

int64_t timestamp = getTimestamp();
if (timestamp < lastTimestamp_) {
LOG(ERROR) << "Clock back";
return firstBitRevert & getIdByTs(timestamp);
}

return getIdByTs(timestamp);
}

// get snowflake id by timestamp
// update lastTimestamp_ or sequence_
int64_t Snowflake::getIdByTs(int64_t timestamp) {
// if it is the same time, then the microsecond sequence
if (lastTimestamp_ == timestamp) {
sequence_ = (sequence_ + 1) & maxSequence;
// if the microsecond sequence overflow
if (sequence_ == 0) {
// block to the next millisecond, get the new timestamp
timestamp = nextTimestamp();
}
} else {
lastTimestamp_ = timestamp;
sequence_ = 0;
}
return (timestamp - startStmp) << timestampLeft | workerId_ << workerIdLeft | sequence_;
}

int64_t Snowflake::getTimestamp() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}

// wait for the next millisecond
int64_t Snowflake::nextTimestamp() {
int64_t timestamp = getTimestamp();
while (timestamp <= lastTimestamp_) {
timestamp = getTimestamp();
}
return timestamp;
}

} // namespace nebula
62 changes: 62 additions & 0 deletions src/common/id/Snowflake.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef COMMON_ID_SNOWFLAKE_H_
#define COMMON_ID_SNOWFLAKE_H_

#include "clients/meta/MetaClient.h"
#include "common/base/Base.h"

namespace nebula {
class Snowflake {
FRIEND_TEST(SnowflakeTest, TestWorkerId);
FRIEND_TEST(SnowflakeTest, TestConcurrency);
friend size_t SnowflakeTest(size_t iters);
friend int64_t getSequence(int64_t id);
friend int64_t getWorkerId(int64_t id);
friend int64_t getTimestamp(int64_t id);

public:
Snowflake() = default;

static void initWorkerId(meta::MetaClient* client);

int64_t getId();

private:
static int64_t getTimestamp();

int64_t nextTimestamp();

int64_t getIdByTs(int64_t timestamp);

std::mutex mutex_;

/*
* Snowflake id:
* timestampBit 38 bits (8.6 years) |
* workerBit 13 bits (8k workerid) |
* sequenceBit 12 bits (4 million per second) |
*/
int64_t lastTimestamp_{-1};
static inline int64_t workerId_{0};
int64_t sequence_{0};

static constexpr int64_t startStmp = 1577808000000; // start time: 2020-01-01 00:00:00
static constexpr int64_t workerBit = 13; // worker id bit
static constexpr int64_t sequenceBit = 12; // sequence bit

static constexpr int64_t maxWorkerId = (1 << workerBit) - 1; // as worker id mask
static constexpr int64_t maxSequence = (1 << sequenceBit) - 1;

static constexpr int64_t workerIdLeft = sequenceBit; // workerId left bits
static constexpr int64_t timestampLeft = sequenceBit + workerBit;

static constexpr int64_t firstBitRevert = 0x9000000000000000; // revert the first bit
};

} // namespace nebula

#endif // COMMON_ID_SNOWFLAKE_H_
80 changes: 80 additions & 0 deletions src/common/id/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright (c) 2021 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License,

nebula_add_executable(
NAME snowflake_bm
SOURCES SnowflakeBenchmark.cpp
OBJECTS
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:snowflake_obj>
$<TARGET_OBJECTS:meta_client_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:meta_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:conf_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:version_obj>
$<TARGET_OBJECTS:file_based_cluster_id_man_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:datatypes_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
$<TARGET_OBJECTS:expression_obj>
LIBRARIES
follybenchmark
boost_regex
${PROXYGEN_LIBRARIES}
${THRIFT_LIBRARIES}
)


nebula_add_test(
NAME snowflake_test
SOURCES SnowflakeTest.cpp
OBJECTS
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:snowflake_obj>
$<TARGET_OBJECTS:meta_client_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:meta_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:conf_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:version_obj>
$<TARGET_OBJECTS:file_based_cluster_id_man_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:datatypes_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
$<TARGET_OBJECTS:expression_obj>
LIBRARIES
gtest
gtest_main
${PROXYGEN_LIBRARIES}
${THRIFT_LIBRARIES}
)
Loading