Skip to content

Commit

Permalink
Merge pull request #5857 from CurtizJ/dns-cache
Browse files Browse the repository at this point in the history
Implement dns cache with asynchronous update.
  • Loading branch information
alexey-milovidov authored Jul 5, 2019
2 parents 8b62c96 + f3ac23f commit 2469ec1
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 133 deletions.
7 changes: 5 additions & 2 deletions dbms/programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
else
{
/// Initialize a watcher updating DNS cache in case of network errors
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context);
/// Initialize a watcher periodically updating DNS cache
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context, config().getInt("dns_cache_update_period", 15));
}

#if defined(__linux__)
Expand Down Expand Up @@ -773,6 +773,8 @@ int Server::main(const std::vector<std::string> & /*args*/)

main_config_reloader->start();
users_config_reloader->start();
if (dns_cache_updater)
dns_cache_updater->start();

{
std::stringstream message;
Expand Down Expand Up @@ -823,6 +825,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
log, "Closed connections." << (current_connections ? " But " + toString(current_connections) + " remains."
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>" : ""));

dns_cache_updater.reset();
main_config_reloader.reset();
users_config_reloader.reset();
});
Expand Down
101 changes: 93 additions & 8 deletions dbms/src/Common/DNSResolver.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
#include "DNSResolver.h"
#include <common/SimpleCache.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Core/Names.h>
#include <Core/Types.h>
#include <Poco/Net/DNS.h>
#include <Poco/Net/NetException.h>
#include <Poco/NumberParser.h>
#include <Poco/Logger.h>
#include <common/logger_useful.h>
#include <arpa/inet.h>
#include <atomic>
#include <optional>

namespace ProfileEvents
{
extern Event NetworkErrors;
}


namespace DB
{
Expand Down Expand Up @@ -67,23 +76,29 @@ static void splitHostAndPort(const std::string & host_and_port, std::string & ou
}
}


static Poco::Net::IPAddress resolveIPAddressImpl(const std::string & host)
{
/// NOTE: Poco::Net::DNS::resolveOne(host) doesn't work for IP addresses like 127.0.0.2
/// Therefore we use SocketAddress constructor with dummy port to resolve IP
return Poco::Net::SocketAddress(host, 0U).host();
}


struct DNSResolver::Impl
{
SimpleCache<decltype(resolveIPAddressImpl), &resolveIPAddressImpl> cache_host;

std::mutex drop_mutex;
std::mutex update_mutex;

/// Cached server host name
std::mutex mutex;
std::optional<String> host_name;

/// Store hosts, which was asked to resolve from last update of DNS cache.
NameSet new_hosts;

/// Store all hosts, which was whenever asked to resolve
NameSet known_hosts;

/// If disabled, will not make cache lookups, will resolve addresses manually on each call
std::atomic<bool> disable_cache{false};
};
Expand All @@ -93,28 +108,43 @@ DNSResolver::DNSResolver() : impl(std::make_unique<DNSResolver::Impl>()) {}

Poco::Net::IPAddress DNSResolver::resolveHost(const std::string & host)
{
return !impl->disable_cache ? impl->cache_host(host) : resolveIPAddressImpl(host);
if (impl->disable_cache)
return resolveIPAddressImpl(host);

addToNewHosts(host);
return impl->cache_host(host);
}

Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_and_port)
{
if (impl->disable_cache)
return Poco::Net::SocketAddress(host_and_port);

String host;
UInt16 port;
splitHostAndPort(host_and_port, host, port);

return !impl->disable_cache ? Poco::Net::SocketAddress(impl->cache_host(host), port) : Poco::Net::SocketAddress(host_and_port);
addToNewHosts(host);
return Poco::Net::SocketAddress(impl->cache_host(host), port);
}

Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, UInt16 port)
{
return !impl->disable_cache ? Poco::Net::SocketAddress(impl->cache_host(host), port) : Poco::Net::SocketAddress(host, port);
if (impl->disable_cache)
return Poco::Net::SocketAddress(host, port);

addToNewHosts(host);
return Poco::Net::SocketAddress(impl->cache_host(host), port);
}

void DNSResolver::dropCache()
{
impl->cache_host.drop();

std::unique_lock lock(impl->mutex);
std::scoped_lock lock(impl->update_mutex, impl->drop_mutex);

impl->known_hosts.clear();
impl->new_hosts.clear();
impl->host_name.reset();
}

Expand All @@ -128,14 +158,69 @@ String DNSResolver::getHostName()
if (impl->disable_cache)
return Poco::Net::DNS::hostName();

std::unique_lock lock(impl->mutex);
std::lock_guard lock(impl->drop_mutex);

if (!impl->host_name.has_value())
impl->host_name.emplace(Poco::Net::DNS::hostName());

return *impl->host_name;
}

bool DNSResolver::updateCache()
{
{
std::lock_guard lock(impl->drop_mutex);
for (auto & host : impl->new_hosts)
impl->known_hosts.insert(std::move(host));
impl->new_hosts.clear();

impl->host_name.emplace(Poco::Net::DNS::hostName());
}

std::lock_guard lock(impl->update_mutex);

bool updated = false;
String lost_hosts;
for (const auto & host : impl->known_hosts)
{
try
{
updated |= updateHost(host);
}
catch (const Poco::Net::NetException &)
{
ProfileEvents::increment(ProfileEvents::NetworkErrors);

if (!lost_hosts.empty())
lost_hosts += ", ";
lost_hosts += host;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}

if (!lost_hosts.empty())
LOG_INFO(&Logger::get("DNSResolver"), "Cached hosts not found: " << lost_hosts);

return updated;
}

bool DNSResolver::updateHost(const String & host)
{
/// Usage of updateHost implies that host is already in cache and there is no extra computations
auto old_value = impl->cache_host(host);
impl->cache_host.update(host);
return old_value != impl->cache_host(host);
}

void DNSResolver::addToNewHosts(const String & host)
{
std::lock_guard lock(impl->drop_mutex);
impl->new_hosts.insert(host);
}

DNSResolver::~DNSResolver() = default;


Expand Down
17 changes: 14 additions & 3 deletions dbms/src/Common/DNSResolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
#include <memory>
#include <ext/singleton.h>
#include <Core/Types.h>
#include <Core/Names.h>


namespace DB
{

/// A singleton implementing DNS names resolving with optional permanent DNS cache
/// The cache could be updated only manually via drop() method
/// A singleton implementing DNS names resolving with optional DNS cache
/// The cache is being updated asynchronous in separate thread (see DNSCacheUpdater)
/// or it could be updated manually via drop() method.
class DNSResolver : public ext::singleton<DNSResolver>
{
public:
Expand All @@ -34,16 +36,25 @@ class DNSResolver : public ext::singleton<DNSResolver>
/// Drops all caches
void dropCache();

/// Updates all known hosts in cache.
/// Returns true if IP of any host has been changed.
bool updateCache();

~DNSResolver();

protected:
private:

DNSResolver();

friend class ext::singleton<DNSResolver>;

struct Impl;
std::unique_ptr<Impl> impl;

/// Returns true if IP of host has been changed.
bool updateHost(const String & host);

void addToNewHosts(const String & host);
};

}
86 changes: 19 additions & 67 deletions dbms/src/Interpreters/DNSCacheUpdater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,98 +2,50 @@
#include <Common/DNSResolver.h>
#include <Interpreters/Context.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ProfileEvents.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>


namespace ProfileEvents
{
extern Event NetworkErrors;
}


namespace DB
{

namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int ALL_CONNECTION_TRIES_FAILED;
}


/// Call it inside catch section
/// Returns true if it is a network error
static bool isNetworkError()
{
try
{
throw;
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::TIMEOUT_EXCEEDED || e.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
return true;
}
catch (Poco::Net::DNSException &)
{
return true;
}
catch (Poco::TimeoutException &)
{
return true;
}
catch (...)
{
/// Do nothing
}

return false;
}


DNSCacheUpdater::DNSCacheUpdater(Context & context_)
: context(context_), pool(context_.getSchedulePool())
DNSCacheUpdater::DNSCacheUpdater(Context & context_, Int32 update_period_seconds_)
: context(context_),
update_period_seconds(update_period_seconds_),
pool(context_.getSchedulePool())
{
task_handle = pool.createTask("DNSCacheUpdater", [this]{ run(); });
}

void DNSCacheUpdater::run()
{
auto num_current_network_exceptions = ProfileEvents::global_counters[ProfileEvents::NetworkErrors].load(std::memory_order_relaxed);
if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache)
watch.restart();
auto & resolver = DNSResolver::instance();

/// Reload cluster config if IP of any host has been changed since last update.
if (resolver.updateCache())
{
LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"),
"IPs of some hosts have been changed. Will reload cluster config.");
try
{
LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), "Updating DNS cache");

DNSResolver::instance().dropCache();
context.reloadClusterConfig();

last_num_network_erros = num_current_network_exceptions;
task_handle->scheduleAfter(min_update_period_seconds * 1000);
return;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}

task_handle->scheduleAfter(10 * 1000);
auto interval_ms = std::max(0, update_period_seconds * 1000 - static_cast<Int32>(watch.elapsedMilliseconds()));
task_handle->scheduleAfter(interval_ms);
}

bool DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded()
void DNSCacheUpdater::start()
{
if (isNetworkError())
{
ProfileEvents::increment(ProfileEvents::NetworkErrors);
return true;
}

return false;
task_handle->activateAndSchedule();
}

DNSCacheUpdater::~DNSCacheUpdater()
{
task_handle->deactivate();
}

}
Loading

0 comments on commit 2469ec1

Please sign in to comment.