Skip to content

Add IPv6 support #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions example/counter/run_client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ DEFINE_string crash_on_fatal 'true' 'Crash on fatal log'
DEFINE_string log_each_request 'false' 'Print log for each request'
DEFINE_string valgrind 'false' 'Run in valgrind'
DEFINE_string use_bthread "true" "Use bthread to send request"
DEFINE_string ip "" "IP address to use, or [ipv6]"

FLAGS "$@" || exit 1

# hostname prefers ipv6
IP=`hostname -i | awk '{print $NF}'`
if [ "$FLAGS_ip" = "" ] ; then
# hostname prefers ipv6
IP=`hostname -i | awk '{print $NF}'`
else
IP=$FLAGS_ip
fi

if [ "$FLAGS_valgrind" == "true" ] && [ $(which valgrind) ] ; then
VALGRIND="valgrind --tool=memcheck --leak-check=full"
Expand Down
10 changes: 8 additions & 2 deletions example/counter/run_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ DEFINE_integer max_segment_size '8388608' 'Max segment size'
DEFINE_integer server_num '3' 'Number of servers'
DEFINE_boolean clean 1 'Remove old "runtime" dir before running'
DEFINE_integer port 8100 "Port of the first server"
DEFINE_string ip "" "IP address to use, or [ipv6]"

# parse the command-line
FLAGS "$@" || exit 1
Expand All @@ -36,8 +37,12 @@ eval set -- "${FLAGS_ARGV}"
# The alias for printing to stderr
alias error=">&2 echo counter: "

# hostname prefers ipv6
IP=`hostname -i | awk '{print $NF}'`
if [ "$FLAGS_ip" = "" ] ; then
# hostname prefers ipv6
IP=`hostname -i | awk '{print $NF}'`
else
IP=$FLAGS_ip
fi

if [ "$FLAGS_valgrind" == "true" ] && [ $(which valgrind) ] ; then
VALGRIND="valgrind --tool=memcheck --leak-check=full"
Expand All @@ -62,6 +67,7 @@ for ((i=0; i<$FLAGS_server_num; ++i)); do
-bthread_concurrency=${FLAGS_bthread_concurrency}\
-crash_on_fatal_log=${FLAGS_crash_on_fatal} \
-raft_max_segment_size=${FLAGS_max_segment_size} \
-ip=${IP} \
-raft_sync=${FLAGS_sync} \
-port=$((${FLAGS_port}+i)) -conf="${raft_peers}" > std.log 2>&1 &
cd ../..
Expand Down
24 changes: 21 additions & 3 deletions example/counter/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DEFINE_int32(snapshot_interval, 30, "Interval between each snapshot");
DEFINE_string(conf, "", "Initial configuration of the replication group");
DEFINE_string(data_path, "./data", "Path of data stored on");
DEFINE_string(group, "Counter", "Id of the replication group");
DEFINE_string(ip, "", "IP Address to use to serve requests");

namespace example {
class Counter;
Expand Down Expand Up @@ -74,6 +75,13 @@ class Counter : public braft::StateMachine {
// Starts this node
int start() {
butil::EndPoint addr(butil::my_ip(), FLAGS_port);
if (FLAGS_ip != "") {
std::string endpoint_str = FLAGS_ip + ":" + std::to_string(FLAGS_port);
if (0 != butil::str2endpoint(endpoint_str.c_str(), &addr)) {
LOG(ERROR) << "Fail to parse endpoint `" << endpoint_str << '\'';
return -1;
}
}
braft::NodeOptions node_options;
if (node_options.initial_conf.parse_from(FLAGS_conf) != 0) {
LOG(ERROR) << "Fail to parse configuration `" << FLAGS_conf << '\'';
Expand Down Expand Up @@ -377,9 +385,19 @@ int main(int argc, char* argv[]) {
// clients.
// Notice the default options of server is used here. Check out details from
// the doc of brpc if you would like change some options;
if (server.Start(FLAGS_port, NULL) != 0) {
LOG(ERROR) << "Fail to start Server";
return -1;
if (FLAGS_ip != "" && FLAGS_ip[0] == '[') {
butil::EndPoint ep;
butil::str2endpoint("[::]", FLAGS_port, &ep);
if (server.Start(ep, NULL) != 0) {
LOG(ERROR) << "Fail to start IPv6 Server";
return -1;
}

} else {
if (server.Start(FLAGS_port, NULL) != 0) {
LOG(ERROR) << "Fail to start Server";
return -1;
}
}

// It's ok to start Counter;
Expand Down
32 changes: 29 additions & 3 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,36 @@ struct PeerId {
bool is_witness() const {
return role == WITNESS;
}

void parse_ip_from_id(const std::string& str, size_t* end_pos) {
*end_pos = std::string::npos;
if (str.length() == 0) {
return;
}

if (str[0] == '[') {
*end_pos = str.find(']') + 1;
} else {
*end_pos = str.find(':');
}
}

int parse(const std::string& str) {
reset();
char ip_str[64];
int value = REPLICA;
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str, &addr.port, &idx, &value)) {

size_t end_pos = std::string::npos;
parse_ip_from_id(str, &end_pos);

if (end_pos == std::string::npos) {
reset();
return -1;
}

std::string ip_str = str.substr(0, end_pos);
int port;

if (1 > sscanf(str.c_str() + end_pos, "%*[:]%d%*[:]%d%*[:]%d", &port, &idx, &value)) {
reset();
return -1;
}
Expand All @@ -84,7 +109,8 @@ struct PeerId {
reset();
return -1;
}
if (0 != butil::str2ip(ip_str, &addr.ip)) {
std::string end_point_str = ip_str + ":" + std::to_string(port);
if (0 != butil::str2endpoint(end_point_str.c_str(), &addr)) {
reset();
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ int NodeImpl::init(const NodeOptions& options) {
_options = options;

// check _server_id
if (butil::IP_ANY == _server_id.addr.ip) {
if (butil::IP_ANY == _server_id.addr.ip && !butil::is_endpoint_extended(_server_id.addr)) {
LOG(ERROR) << "Group " << _group_id
<< " Node can't started from IP_ANY";
return -1;
Expand Down
15 changes: 12 additions & 3 deletions src/braft/node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,18 @@ NodeManager::~NodeManager() {}

bool NodeManager::server_exists(butil::EndPoint addr) {
BAIDU_SCOPED_LOCK(_mutex);
if (addr.ip != butil::IP_ANY) {
butil::EndPoint any_addr(butil::IP_ANY, addr.port);

int port = addr.port;

if (butil::is_endpoint_extended(addr)) {
// No easy way of getting just the port for now in Brpc
std::string str(butil::endpoint2str(addr).c_str());
std::string port_str = str.substr(str.rfind(':') + 1);
port = std::stoi(port_str);
}

if (addr.ip != butil::IP_ANY || butil::is_endpoint_extended(addr)) {
butil::EndPoint any_addr(butil::IP_ANY, port);
if (_addr_set.find(any_addr) != _addr_set.end()) {
return true;
}
Expand Down Expand Up @@ -166,4 +176,3 @@ void NodeManager::get_all_nodes(std::vector<scoped_refptr<NodeImpl> >* nodes) {
}

} // namespace braft

27 changes: 27 additions & 0 deletions test/test_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,35 @@ TEST_F(TestUsageSuits, PeerId) {

braft::PeerId id3("1.2.3.4:1000:0");
LOG(INFO) << "id:" << id3;

}

TEST_F(TestUsageSuits, BadIPv4) {
braft::PeerId id1(id1);
ASSERT_EQ(-1, id1.parse("[1.1]:1000:0"));
LOG(INFO) << "id: "<< id1.to_string();
}


TEST_F(TestUsageSuits, IPv6) {
braft::PeerId id1(id1);
ASSERT_EQ(0, id1.parse("[::1]:1000:0"));
ASSERT_TRUE(id1.to_string() == "[::1]:1000:0:0");

ASSERT_EQ(0, id1.parse("[::1]:1000:0:1"));
ASSERT_TRUE(id1.to_string() == "[::1]:1000:0:1");

LOG(INFO) << "id: "<< id1.to_string();
}

TEST_F(TestUsageSuits, BadIPv6) {
braft::PeerId id1(id1);
ASSERT_EQ(-1, id1.parse("[1.1.1.1]:1000:0"));
ASSERT_EQ(-1, id1.parse("[zzzz::]:1000:0"));
LOG(INFO) << "id: "<< id1.to_string();
}


TEST_F(TestUsageSuits, Configuration) {
braft::Configuration conf1;
ASSERT_TRUE(conf1.empty());
Expand Down