Skip to content

Commit

Permalink
[EventEngine] Fix issues found when enabling event_engine_dns exper…
Browse files Browse the repository at this point in the history
…iment in OSS (grpc#35530)

Using `AF_UNSPEC` for both IPv4 and IPv6 queries does not work in all cases. Specifically, for `localhost:<>`, c-ares only returns the IPv6 record i.e. `::1`.

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes grpc#35530

COPYBARA_INTEGRATE_REVIEW=grpc#35530 from yijiem:enable-oss-ee-dns-posix 452b5a2
PiperOrigin-RevId: 599989537
  • Loading branch information
yijiem authored and copybara-github committed Jan 20, 2024
1 parent f1254a7 commit 5bf0971
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 71 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions grpc.gyp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2568,7 +2568,6 @@ grpc_cc_library(
],
deps = [
"iomgr_port",
"ref_counted_dns_resolver_interface",
"useful",
"//:event_engine_base_hdrs",
"//:gpr",
Expand Down Expand Up @@ -2599,6 +2598,7 @@ grpc_cc_library(
"absl/strings:str_format",
"absl/types:optional",
"absl/types:variant",
"address_sorting",
"cares",
],
deps = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ void RegisterDnsResolver(CoreConfiguration::Builder* builder) {
std::make_unique<EventEngineClientChannelDNSResolverFactory>());
return;
#endif
#ifndef GRPC_DO_NOT_INSTANTIATE_POSIX_POLLER
if (IsEventEngineDnsEnabled()) {
gpr_log(GPR_DEBUG, "Using EventEngine dns resolver");
builder->resolver_registry()->RegisterResolverFactory(
std::make_unique<EventEngineClientChannelDNSResolverFactory>());
return;
}
#endif
auto resolver = ConfigVars::Get().DnsResolver();
// ---- Ares resolver ----
if (ShouldUseAresDnsResolver(resolver)) {
Expand Down
165 changes: 106 additions & 59 deletions src/core/lib/event_engine/ares_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#if GRPC_ARES == 1

#include <address_sorting/address_sorting.h>
#include <ares.h>

#if ARES_VERSION >= 0x011200
Expand Down Expand Up @@ -144,6 +145,28 @@ absl::Status SetRequestDNSServer(absl::string_view dns_server,
return absl::OkStatus();
}

std::vector<EventEngine::ResolvedAddress> SortAddresses(
const std::vector<EventEngine::ResolvedAddress>& addresses) {
address_sorting_sortable* sortables = static_cast<address_sorting_sortable*>(
gpr_zalloc(sizeof(address_sorting_sortable) * addresses.size()));
for (size_t i = 0; i < addresses.size(); i++) {
sortables[i].user_data =
const_cast<EventEngine::ResolvedAddress*>(&addresses[i]);
memcpy(&sortables[i].dest_addr.addr, addresses[i].address(),
addresses[i].size());
sortables[i].dest_addr.len = addresses[i].size();
}
address_sorting_rfc_6724_sort(sortables, addresses.size());
std::vector<EventEngine::ResolvedAddress> sorted_addresses;
sorted_addresses.reserve(addresses.size());
for (size_t i = 0; i < addresses.size(); ++i) {
sorted_addresses.emplace_back(
*static_cast<EventEngine::ResolvedAddress*>(sortables[i].user_data));
}
gpr_free(sortables);
return sorted_addresses;
}

struct QueryArg {
QueryArg(AresResolver* ar, int id, absl::string_view name)
: ares_resolver(ar), callback_map_id(id), query_name(name) {}
Expand All @@ -156,6 +179,9 @@ struct HostnameQueryArg : public QueryArg {
HostnameQueryArg(AresResolver* ar, int id, absl::string_view name, int p)
: QueryArg(ar, id, name), port(p) {}
int port;
int pending_requests;
absl::Status error_status;
std::vector<EventEngine::ResolvedAddress> result;
};

} // namespace
Expand Down Expand Up @@ -291,9 +317,16 @@ void AresResolver::LookupHostname(
callback_map_.emplace(++id_, std::move(callback));
auto* resolver_arg = new HostnameQueryArg(this, id_, name, port);
if (IsIpv6LoopbackAvailable()) {
ares_gethostbyname(channel_, std::string(host).c_str(), AF_UNSPEC,
// Note that using AF_UNSPEC for both IPv6 and IPv4 queries does not work in
// all cases, e.g. for localhost:<> it only gets back the IPv6 result (i.e.
// ::1).
resolver_arg->pending_requests = 2;
ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
&AresResolver::OnHostbynameDoneLocked, resolver_arg);
ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET6,
&AresResolver::OnHostbynameDoneLocked, resolver_arg);
} else {
resolver_arg->pending_requests = 1;
ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
&AresResolver::OnHostbynameDoneLocked, resolver_arg);
}
Expand Down Expand Up @@ -548,74 +581,88 @@ void AresResolver::OnAresBackupPollAlarm() {
void AresResolver::OnHostbynameDoneLocked(void* arg, int status,
int /*timeouts*/,
struct hostent* hostent) {
std::unique_ptr<HostnameQueryArg> hostname_qa(
static_cast<HostnameQueryArg*>(arg));
auto* hostname_qa = static_cast<HostnameQueryArg*>(arg);
GPR_ASSERT(hostname_qa->pending_requests-- > 0);
auto* ares_resolver = hostname_qa->ares_resolver;
auto nh = ares_resolver->callback_map_.extract(hostname_qa->callback_map_id);
GPR_ASSERT(!nh.empty());
GPR_ASSERT(
absl::holds_alternative<EventEngine::DNSResolver::LookupHostnameCallback>(
nh.mapped()));
auto callback = absl::get<EventEngine::DNSResolver::LookupHostnameCallback>(
std::move(nh.mapped()));
if (status != ARES_SUCCESS) {
std::string error_msg =
absl::StrFormat("address lookup failed for %s: %s",
hostname_qa->query_name, ares_strerror(status));
GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p OnHostbynameDoneLocked: %s",
ares_resolver, error_msg.c_str());
ares_resolver->event_engine_->Run(
[callback = std::move(callback),
status = AresStatusToAbslStatus(status, error_msg)]() mutable {
callback(status);
});
return;
}
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p OnHostbynameDoneLocked name=%s ARES_SUCCESS", ares_resolver,
hostname_qa->query_name.c_str());
std::vector<EventEngine::ResolvedAddress> result;
for (size_t i = 0; hostent->h_addr_list[i] != nullptr; i++) {
switch (hostent->h_addrtype) {
case AF_INET6: {
size_t addr_len = sizeof(struct sockaddr_in6);
struct sockaddr_in6 addr;
memset(&addr, 0, addr_len);
memcpy(&addr.sin6_addr, hostent->h_addr_list[i],
sizeof(struct in6_addr));
addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin6_port = htons(hostname_qa->port);
result.emplace_back(reinterpret_cast<const sockaddr*>(&addr), addr_len);
char output[INET6_ADDRSTRLEN];
ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p c-ares resolver gets a AF_INET6 result: \n"
" addr: %s\n port: %d\n sin6_scope_id: %d\n",
ares_resolver, output, hostname_qa->port, addr.sin6_scope_id);
break;
}
case AF_INET: {
size_t addr_len = sizeof(struct sockaddr_in);
struct sockaddr_in addr;
memset(&addr, 0, addr_len);
memcpy(&addr.sin_addr, hostent->h_addr_list[i], sizeof(struct in_addr));
addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin_port = htons(hostname_qa->port);
result.emplace_back(reinterpret_cast<const sockaddr*>(&addr), addr_len);
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p c-ares resolver gets a AF_INET result: \n"
" addr: %s\n port: %d\n",
ares_resolver, output, hostname_qa->port);
break;
hostname_qa->error_status = AresStatusToAbslStatus(status, error_msg);
} else {
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p OnHostbynameDoneLocked name=%s ARES_SUCCESS",
ares_resolver, hostname_qa->query_name.c_str());
for (size_t i = 0; hostent->h_addr_list[i] != nullptr; i++) {
switch (hostent->h_addrtype) {
case AF_INET6: {
size_t addr_len = sizeof(struct sockaddr_in6);
struct sockaddr_in6 addr;
memset(&addr, 0, addr_len);
memcpy(&addr.sin6_addr, hostent->h_addr_list[i],
sizeof(struct in6_addr));
addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin6_port = htons(hostname_qa->port);
hostname_qa->result.emplace_back(
reinterpret_cast<const sockaddr*>(&addr), addr_len);
char output[INET6_ADDRSTRLEN];
ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p c-ares resolver gets a AF_INET6 result: \n"
" addr: %s\n port: %d\n sin6_scope_id: %d\n",
ares_resolver, output, hostname_qa->port, addr.sin6_scope_id);
break;
}
case AF_INET: {
size_t addr_len = sizeof(struct sockaddr_in);
struct sockaddr_in addr;
memset(&addr, 0, addr_len);
memcpy(&addr.sin_addr, hostent->h_addr_list[i],
sizeof(struct in_addr));
addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
addr.sin_port = htons(hostname_qa->port);
hostname_qa->result.emplace_back(
reinterpret_cast<const sockaddr*>(&addr), addr_len);
char output[INET_ADDRSTRLEN];
ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
GRPC_ARES_RESOLVER_TRACE_LOG(
"resolver:%p c-ares resolver gets a AF_INET result: \n"
" addr: %s\n port: %d\n",
ares_resolver, output, hostname_qa->port);
break;
}
default:
grpc_core::Crash(
absl::StrFormat("resolver:%p Received invalid type of address %d",
ares_resolver, hostent->h_addrtype));
}
}
}
ares_resolver->event_engine_->Run(
[callback = std::move(callback), result = std::move(result)]() mutable {
callback(std::move(result));
});
if (hostname_qa->pending_requests == 0) {
auto nh =
ares_resolver->callback_map_.extract(hostname_qa->callback_map_id);
GPR_ASSERT(!nh.empty());
GPR_ASSERT(absl::holds_alternative<
EventEngine::DNSResolver::LookupHostnameCallback>(nh.mapped()));
auto callback = absl::get<EventEngine::DNSResolver::LookupHostnameCallback>(
std::move(nh.mapped()));
if (!hostname_qa->result.empty() || hostname_qa->error_status.ok()) {
ares_resolver->event_engine_->Run(
[callback = std::move(callback),
result = SortAddresses(hostname_qa->result)]() mutable {
callback(std::move(result));
});
} else {
ares_resolver->event_engine_->Run(
[callback = std::move(callback),
result = std::move(hostname_qa->error_status)]() mutable {
callback(std::move(result));
});
}
delete hostname_qa;
}
}

void AresResolver::OnSRVQueryDoneLocked(void* arg, int status, int /*timeouts*/,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@

#include <grpc/event_engine/event_engine.h>

#include "src/core/lib/event_engine/ref_counted_dns_resolver_interface.h"

namespace grpc_event_engine {
namespace experimental {

// An asynchronous DNS resolver which uses the native platform's getaddrinfo
// API. Only supports A/AAAA records.
class NativePosixDNSResolver : public RefCountedDNSResolverInterface {
class NativePosixDNSResolver : public EventEngine::DNSResolver {
public:
explicit NativePosixDNSResolver(std::shared_ptr<EventEngine> event_engine);

Expand All @@ -48,8 +46,6 @@ class NativePosixDNSResolver : public RefCountedDNSResolverInterface {
void LookupTXT(EventEngine::DNSResolver::LookupTXTCallback on_resolved,
absl::string_view name) override;

void Orphan() override { delete this; }

private:
std::shared_ptr<EventEngine> event_engine_;
};
Expand Down
3 changes: 1 addition & 2 deletions src/core/lib/event_engine/posix_engine/posix_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,7 @@ PosixEventEngine::GetDNSResolver(
}
GRPC_EVENT_ENGINE_DNS_TRACE(
"PosixEventEngine:%p creating NativePosixDNSResolver", this);
return std::make_unique<PosixEventEngine::PosixDNSResolver>(
grpc_core::MakeOrphanable<NativePosixDNSResolver>(shared_from_this()));
return std::make_unique<NativePosixDNSResolver>(shared_from_this());
#endif // GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
}

Expand Down
1 change: 1 addition & 0 deletions test/core/end2end/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ grpc_cc_test(
"//src/core:closure",
"//src/core:default_event_engine",
"//src/core:error",
"//src/core:experiments",
"//src/core:grpc_sockaddr",
"//src/core:iomgr_fwd",
"//src/core:resolved_address",
Expand Down
8 changes: 8 additions & 0 deletions test/core/end2end/goaway_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
Expand Down Expand Up @@ -207,6 +208,13 @@ static void my_cancel_ares_request(grpc_ares_request* request) {
}

int main(int argc, char** argv) {
// TODO(yijiem): rewrite this test with a custom EventEngine DNS Resolver
if (grpc_core::IsEventEngineDnsEnabled()) {
gpr_log(
GPR_ERROR,
"Skipping iomgr-specific DNS test because EventEngine DNS is enabled");
return 0;
}
grpc_completion_queue* cq;
grpc_op ops[6];
grpc_op* op;
Expand Down
1 change: 1 addition & 0 deletions test/core/event_engine/fuzzing_event_engine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ grpc_cc_library(
":fuzzing_event_engine_proto",
"//:event_engine_base_hdrs",
"//src/core:default_event_engine",
"//src/core:native_posix_dns_resolver",
"//src/core:time",
"//test/core/util:grpc_test_util",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/port.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/util/port.h"

#if defined(GRPC_POSIX_SOCKET_TCP)
#include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h"
#endif
// IWYU pragma: no_include <sys/socket.h>

extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
Expand Down Expand Up @@ -497,7 +502,11 @@ bool FuzzingEventEngine::IsWorkerThread() { abort(); }

absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) {
abort();
#if defined(GRPC_POSIX_SOCKET_TCP)
return std::make_unique<NativePosixDNSResolver>(shared_from_this());
#else
grpc_core::Crash("FuzzingEventEngine::GetDNSResolver Not implemented");
#endif
}

void FuzzingEventEngine::Run(Closure* closure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ namespace experimental {

// EventEngine implementation to be used by fuzzers.
// It's only allowed to have one FuzzingEventEngine instantiated at a time.
class FuzzingEventEngine
: public EventEngine,
public std::enable_shared_from_this<FuzzingEventEngine> {
class FuzzingEventEngine : public EventEngine {
public:
struct Options {
Duration max_delay_run_after = std::chrono::seconds(30);
Expand Down
8 changes: 8 additions & 0 deletions test/core/event_engine/test_suite/tests/dns_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,15 @@ TEST_F(EventEngineDNSTest, LocalHost) {
auto dns_resolver = CreateDNSResolverWithoutSpecifyingServer();
dns_resolver->LookupHostname(
[this](auto result) {
#ifdef GRPC_IOS_EVENT_ENGINE_CLIENT
EXPECT_SUCCESS();
#else
EXPECT_TRUE(result.ok());
EXPECT_THAT(*result,
Pointwise(ResolvedAddressEq(),
{*URIToResolvedAddress("ipv6:[::1]:1"),
*URIToResolvedAddress("ipv4:127.0.0.1:1")}));
#endif // GRPC_IOS_EVENT_ENGINE_CLIENT
dns_resolver_signal_.Notify();
},
"localhost:1", "");
Expand Down

0 comments on commit 5bf0971

Please sign in to comment.