Skip to content

Commit

Permalink
Resolve async cares conflicts.
Browse files Browse the repository at this point in the history
New dns requests are handled by the new servers, and the old servers closed when all requests will be resolved.
  • Loading branch information
emotional-engineering committed Apr 8, 2015
1 parent 264a8f3 commit 913765d
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 2 deletions.
1 change: 1 addition & 0 deletions deps/cares/build.mk
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ OBJS= \
src/bitncmp.o \
src/inet_net_pton.o \
src/inet_ntop.o \
src/ares_change_servers.o \

CFLAGS += -I. -I$(SRCDIR)/include -DHAVE_CONFIG_H

Expand Down
3 changes: 2 additions & 1 deletion deps/cares/cares.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@
'src/inet_ntop.c',
'src/ares_inet_net_pton.h',
'src/setup_once.h',
'src/windows_port.c'
'src/windows_port.c',
'src/ares_change_servers.c'
],
'conditions': [
[ 'library=="static_library"', {
Expand Down
4 changes: 4 additions & 0 deletions deps/cares/include/ares.h
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,10 @@ CARES_EXTERN int ares_set_servers_csv(ares_channel channel,

CARES_EXTERN int ares_get_servers(ares_channel channel,
struct ares_addr_node **servers);

CARES_EXTERN int ares_change_servers(ares_channel channel, struct ares_addr_node* source_servers);

CARES_EXTERN int ares_close_old_servers(ares_channel channel);

CARES_EXTERN const char *ares_inet_ntop(int af, const void *src, char *dst,
ares_socklen_t size);
Expand Down
263 changes: 263 additions & 0 deletions deps/cares/src/ares_change_servers.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@

#include "ares_setup.h"
#include "ares.h"
#include "ares_private.h"

int ares_rebuild_queries_list(ares_channel channel, int skip_first, int skip_last, bool is_shift);
int ares_close_old_servers(ares_channel channel);
int ares_server_shift(ares_channel channel);
int ares_finish_change_servers(ares_channel channel);

/**
* Add new servers to the channel
*/

int ares_change_servers(ares_channel channel, struct ares_addr_node* source_servers)
{

struct ares_addr_node *source_server;
int nservers;

struct server_state *server;

int new_servers_count = 0;

int i;

for (source_server = source_servers; source_server; source_server = source_server->next)
{
new_servers_count++;
}

nservers = channel->nservers + new_servers_count;

channel->servers = realloc(channel->servers, nservers * sizeof(struct server_state));

if (!channel->servers) {
return ARES_ENOMEM;
}

for (i = channel->nservers, source_server = source_servers; i < nservers; i++, source_server = source_server->next)
{

server = &channel->servers[i];

server->addr.family = source_server->family;

if (source_server->family == AF_INET)
memcpy(&channel->servers[i].addr.addrV4, &source_server->addrV4,
sizeof(source_server->addrV4));
else
memcpy(&channel->servers[i].addr.addrV6, &source_server->addrV6,
sizeof(source_server->addrV6));

/*
* Watch ares__init_servers_state()
*/

server->udp_socket = ARES_SOCKET_BAD;
server->tcp_socket = ARES_SOCKET_BAD;
server->tcp_connection_generation = ++channel->tcp_connection_generation;
server->tcp_lenbuf_pos = 0;
server->tcp_buffer_pos = 0;
server->tcp_buffer = NULL;
server->tcp_length = 0;
server->qhead = NULL;
server->qtail = NULL;
ares__init_list_head(&server->queries_to_server);
server->channel = channel;
server->is_broken = 0;

}

channel->nservers_change = channel->nservers; /* how many servers will be shifted */

/*
* First new server will be used for new dns requests
*/

channel->last_server = channel->nservers;
channel->nservers = nservers;

channel->rotation_state = channel->rotate; /* save rotation state */
channel->rotate = 0; /* turn off rotation, use one server */

/*
* Need to change references in all queries to new server->queries_to_server reference. Skip last new servers, because they have right references in lists
*/

ares_rebuild_queries_list(channel, 0, new_servers_count, false);

return ARES_SUCCESS;

}

/*
* Close old servers if they finish all queries.
* Call at the end of ares process pool.
*/

int ares_close_old_servers(ares_channel channel)
{

struct server_state *server;
int i;
int status;

if (channel->nservers_change < 1)
{
/*
* Now nothing is changing.
*/

return ARES_SUCCESS;
}

if (channel->servers)
{
for (i = 0; i < channel->last_server; i++)
{

server = &channel->servers[i];

if (ares__is_list_empty(&server->queries_to_server))
{

ares__close_sockets(channel, server);

status = ares_server_shift(channel);

if (status != ARES_SUCCESS)
{
return status;
}

ares_rebuild_queries_list(channel, i, 0, true);

if (channel->nservers_change > 0)
{
channel->nservers_change--;
}

i--;
}

}

if (channel->last_server == 0)
{
ares_finish_change_servers(channel);
}

}

return ARES_SUCCESS;

}

/*
* Remove first server from channel.
*/

int ares_server_shift(ares_channel channel)
{

int i;
int nservers = channel->nservers - 1;

struct server_state *servers_buff = malloc(nservers * sizeof(struct server_state));

if (!servers_buff) {
return ARES_ENOMEM;
}

for (i = 1; i < channel->nservers; i++)
{
servers_buff[i - 1] = channel->servers[i];
}

free(channel->servers);

channel->servers = servers_buff;
channel->nservers = nservers;
channel->last_server--;

return ARES_SUCCESS;

}

/*
* Rebuild links in linked lists server->queries_to_server. Used after channel->servers memory relocation
*/

int ares_rebuild_queries_list(ares_channel channel, int skip_first, int skip_last, bool is_shift)
{

struct server_state *server;

struct list_node* list_head;
struct list_node* list_node;

struct query *query;

int i;

for (i = 0; i < (channel->nservers - skip_last); i++)
{

server = &channel->servers[i];

list_head = &server->queries_to_server;

if (list_head->next->data == NULL)
{
/*
* "empty list" with old "next" and "prev" pointer to head. "list_head->next" same "list_head"
*/

ares__init_list_head(list_head);
}
else
{
/*
* Change next and previous link in neighbor nodes
*/

list_head->prev->next = list_head;
list_head->next->prev = list_head;

/*
* Servers can be shifted before query will be responsed. query save server id in query->server.
* Decrement query->server of each query of this server->queries_to_server.
* Is used after shift one server.
*/

if(is_shift && i >= skip_first)
{
for (list_node = list_head->next; list_node != list_head; list_node = list_node->next)
{
query = list_node->data;
query->server = query->server - 1;

}
}
}
}

return ARES_SUCCESS;

}

/*
* Restoration of the previous state of the channel.
*/

int ares_finish_change_servers(ares_channel channel)
{

channel->rotate = channel->rotation_state;
channel->rotation_state = -1;

return ARES_SUCCESS;

}
5 changes: 5 additions & 0 deletions deps/cares/src/ares_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ struct ares_channeldata {

ares_sock_create_callback sock_create_cb;
void *sock_create_cb_data;

/* Providing gradual addition of servers */
int nservers_change; /* how many servers are changing now */
int rotation_state;

};

/* return true if now is exactly check time or later */
Expand Down
12 changes: 11 additions & 1 deletion src/cares_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ static void ares_timeout(uv_timer_t* handle) {
static void ares_poll_cb(uv_poll_t* watcher, int status, int events) {
ares_task_t* task = ContainerOf(&ares_task_t::poll_watcher, watcher);
Environment* env = task->env;

int status;

/* Reset the idle timer */
uv_timer_again(env->cares_timer_handle());
Expand All @@ -122,6 +124,14 @@ static void ares_poll_cb(uv_poll_t* watcher, int status, int events) {
ares_process_fd(env->cares_channel(),
events & UV_READABLE ? task->sock : ARES_SOCKET_BAD,
events & UV_WRITABLE ? task->sock : ARES_SOCKET_BAD);

status = ares_close_old_servers(env->cares_channel());

if (status != ARES_SUCCESS)
{
/* error */
}

}


Expand Down Expand Up @@ -1185,7 +1195,7 @@ static void SetServers(const FunctionCallbackInfo<Value>& args) {
}

if (err == 0)
err = ares_set_servers(env->cares_channel(), &servers[0]);
err = ares_change_servers(env->cares_channel(), &servers[0]); /* gradual addition of servers */
else
err = ARES_EBADSTR;

Expand Down

0 comments on commit 913765d

Please sign in to comment.