Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix udp trackers #115

Merged
merged 4 commits into from
Oct 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ config.status
confdefs.h
conftest.dir
configure
conftest.*
depcomp
install-sh
libtool
Expand Down
31 changes: 31 additions & 0 deletions rak/socket_address.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class socket_address {
static const int pf_local = PF_UNIX;
#endif

bool is_any() const;
bool is_valid() const;
bool is_bindable() const;
bool is_address_any() const;
Expand All @@ -93,6 +94,8 @@ class socket_address {
std::string address_str() const;
bool address_c_str(char* buf, socklen_t size) const;

std::string pretty_address_str() const;

// Attemts to set it as an inet, then an inet6 address. It will
// never set anything but net addresses, no local/unix.
bool set_address_str(const std::string& a) { return set_address_c_str(a.c_str()); }
Expand Down Expand Up @@ -233,6 +236,18 @@ class socket_address_inet6 {
struct sockaddr_in6 m_sockaddr;
};

inline bool
socket_address::is_any() const {
switch (family()) {
case af_inet:
return sa_inet()->is_any();
case af_inet6:
return sa_inet6()->is_any();
default:
return false;
}
}

inline bool
socket_address::is_valid() const {
switch (family()) {
Expand Down Expand Up @@ -317,6 +332,22 @@ socket_address::address_c_str(char* buf, socklen_t size) const {
}
}

inline std::string
socket_address::pretty_address_str() const {
switch (family()) {
case af_inet:
return sa_inet()->address_str();
case af_inet6:
return sa_inet6()->address_str();
default:
if (port() == 0)
return std::string("no family");
else
return std::string("no family with port");
}
}


inline bool
socket_address::set_address_c_str(const char* a) {
if (sa_inet()->set_address_c_str(a)) {
Expand Down
18 changes: 9 additions & 9 deletions src/torrent/tracker_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ TrackerList::send_state(Tracker* tracker, int new_event) {
tracker->send_state(new_event);
tracker->inc_request_counter();

LT_LOG_TRACKER(INFO, "Sending '%s' to group:%u url:'%s'.",
LT_LOG_TRACKER(INFO, "sending '%s (group:%u url:%s)",
option_as_string(OPTION_TRACKER_EVENT, new_event),
tracker->group(), tracker->url().c_str());
}
Expand All @@ -168,7 +168,7 @@ TrackerList::send_scrape(Tracker* tracker) {
tracker->send_scrape();
tracker->inc_request_counter();

LT_LOG_TRACKER(INFO, "Sending 'scrape' to group:%u url:'%s'.",
LT_LOG_TRACKER(INFO, "sending 'scrape' (group:%u url:%s)",
tracker->group(), tracker->url().c_str());
}

Expand Down Expand Up @@ -204,15 +204,15 @@ TrackerList::insert_url(unsigned int group, const std::string& url, bool extra_t
tracker = new TrackerDht(this, url, flags);

} else {
LT_LOG_TRACKER(WARN, "Could find matching tracker protocol for url: '%s'.", url.c_str());
LT_LOG_TRACKER(WARN, "could find matching tracker protocol (url:%s)", url.c_str());

if (extra_tracker)
throw torrent::input_error("Could find matching tracker protocol for url: '" + url + "'.");
throw torrent::input_error("could find matching tracker protocol (url:" + url + ")");

return;
}

LT_LOG_TRACKER(INFO, "Added tracker group:%i url:'%s'.", group, url.c_str());
LT_LOG_TRACKER(INFO, "added tracker (group:%i url:%s)", group, url.c_str());
insert(group, tracker);
}

Expand Down Expand Up @@ -331,7 +331,7 @@ TrackerList::receive_success(Tracker* tb, AddressList* l) {
l->sort();
l->erase(std::unique(l->begin(), l->end()), l->end());

LT_LOG_TRACKER(INFO, "Received %u peers from tracker url:'%s'.", l->size(), tb->url().c_str());
LT_LOG_TRACKER(INFO, "received %u peers (url:%s)", l->size(), tb->url().c_str());

tb->m_success_time_last = cachedTime.seconds();
tb->m_success_counter++;
Expand All @@ -348,7 +348,7 @@ TrackerList::receive_failed(Tracker* tb, const std::string& msg) {
if (itr == end() || tb->is_busy())
throw internal_error("TrackerList::receive_failed(...) called but the iterator is invalid.");

LT_LOG_TRACKER(INFO, "Failed to connect to tracker url:'%s' msg:'%s'.", tb->url().c_str(), msg.c_str());
LT_LOG_TRACKER(INFO, "failed to connect to tracker (url:%s msg:%s)", tb->url().c_str(), msg.c_str());

tb->m_failed_time_last = cachedTime.seconds();
tb->m_failed_counter++;
Expand All @@ -362,7 +362,7 @@ TrackerList::receive_scrape_success(Tracker* tb) {
if (itr == end() || tb->is_busy())
throw internal_error("TrackerList::receive_success(...) called but the iterator is invalid.");

LT_LOG_TRACKER(INFO, "Received scrape from tracker url:'%s'.", tb->url().c_str());
LT_LOG_TRACKER(INFO, "received scrape from tracker (url:%s)", tb->url().c_str());

tb->m_scrape_time_last = cachedTime.seconds();
tb->m_scrape_counter++;
Expand All @@ -378,7 +378,7 @@ TrackerList::receive_scrape_failed(Tracker* tb, const std::string& msg) {
if (itr == end() || tb->is_busy())
throw internal_error("TrackerList::receive_failed(...) called but the iterator is invalid.");

LT_LOG_TRACKER(INFO, "Failed to scrape tracker url:'%s' msg:'%s'.", tb->url().c_str(), msg.c_str());
LT_LOG_TRACKER(INFO, "failed to scrape tracker (url:%s msg:%s)", tb->url().c_str(), msg.c_str());

if (m_slot_scrape_failed)
m_slot_scrape_failed(tb, msg);
Expand Down
90 changes: 50 additions & 40 deletions src/tracker/tracker_udp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@

#include <sys/types.h>

#include <torrent/connection_manager.h>
#include <cstdio>

#include "rak/error_number.h"

#include "net/address_list.h"
#include "torrent/exceptions.h"
#include "torrent/connection_manager.h"
Expand All @@ -57,10 +58,10 @@
#include "manager.h"

#define LT_LOG_TRACKER(log_level, log_fmt, ...) \
lt_log_print_info(LOG_TRACKER_##log_level, m_parent->info(), "tracker", "[%u] " log_fmt, group(), __VA_ARGS__);
lt_log_print_info(LOG_TRACKER_##log_level, m_parent->info(), "tracker_udp", "[%u] " log_fmt, group(), __VA_ARGS__);

#define LT_LOG_TRACKER_DUMP(log_level, log_dump_data, log_dump_size, log_fmt, ...) \
lt_log_print_info_dump(LOG_TRACKER_##log_level, log_dump_data, log_dump_size, m_parent->info(), "tracker", "[%u] " log_fmt, group(), __VA_ARGS__);
lt_log_print_info_dump(LOG_TRACKER_##log_level, log_dump_data, log_dump_size, m_parent->info(), "tracker_udp", "[%u] " log_fmt, group(), __VA_ARGS__);

namespace torrent {

Expand Down Expand Up @@ -95,20 +96,14 @@ TrackerUdp::send_state(int state) {
close_directly();
m_latest_event = state;

// try {
// utils::uri_state uri_state;
// uri_parse_str(m_url, uri_state);
hostname_type hostname;

if (!parse_udp_url(m_url, hostname, m_port))
return receive_failed("could not parse hostname or port");

// } catch (utils::uri_error& e) {
// return receive_failed("Could not parse UDP hostname or port: " + std::string(e.what()));
// }
LT_LOG_TRACKER(DEBUG, "hostname lookup (address:%s)", hostname.data());

char hostname[1024];

if (std::sscanf(m_url.c_str(), "udp://%1023[^:]:%i", hostname, &m_port) != 2 ||
hostname[0] == '\0' ||
m_port <= 0 || m_port >= (1 << 16))
return receive_failed("Could not parse UDP hostname or port.");
m_sendState = state;

// Because we can only remember one slot, set any pending resolves blocked
// so that if this tracker is deleted, the member function won't be called.
Expand All @@ -117,14 +112,29 @@ TrackerUdp::send_state(int state) {
m_slot_resolver = NULL;
}

LT_LOG_TRACKER(DEBUG, "Tracker UDP hostname lookup for '%s'", hostname);
m_slot_resolver = make_resolver_slot(hostname);
}

m_sendState = state;
m_slot_resolver = manager->connection_manager()->resolver()(hostname, PF_INET, SOCK_DGRAM,
std::bind(&TrackerUdp::start_announce,
this,
std::placeholders::_1,
std::placeholders::_2));
bool
TrackerUdp::parse_udp_url(const std::string& url, hostname_type& hostname, int& port) const {
if (std::sscanf(m_url.c_str(), "udp://%1023[^:]:%i", hostname.data(), &port) == 2 && hostname[0] != '\0' &&
port > 0 && port < (1 << 16))
return true;

if (std::sscanf(m_url.c_str(), "udp://[%1023[^]]]:%i", hostname.data(), &port) == 2 && hostname[0] != '\0' &&
port > 0 && port < (1 << 16))
return true;

return false;
}

TrackerUdp::resolver_type*
TrackerUdp::make_resolver_slot(const hostname_type& hostname) {
return manager->connection_manager()->resolver()(hostname.data(), PF_UNSPEC, SOCK_DGRAM,
std::bind(&TrackerUdp::start_announce,
this,
std::placeholders::_1,
std::placeholders::_2));
}

void
Expand All @@ -135,20 +145,24 @@ TrackerUdp::start_announce(const sockaddr* sa, int err) {
}

if (sa == NULL)
return receive_failed("Could not resolve hostname.");
return receive_failed("could not resolve hostname");

m_connectAddress = *rak::socket_address::cast_from(sa);
m_connectAddress.set_port(m_port);

LT_LOG_TRACKER(DEBUG, "Tracker UDP address found '%s'", m_connectAddress.address_str().c_str());
LT_LOG_TRACKER(DEBUG, "address found (address:%s)", m_connectAddress.address_str().c_str());

if (!m_connectAddress.is_valid())
return receive_failed("Invalid tracker address.");
return receive_failed("invalid tracker address");

if (!get_fd().open_datagram() ||
!get_fd().set_nonblock() ||
!get_fd().bind(*rak::socket_address::cast_from(manager->connection_manager()->bind_address())))
return receive_failed("Could not open UDP socket.");
// TODO: Make each of these a separate error... at the very least separate open and bind.
if (!get_fd().open_datagram() || !get_fd().set_nonblock())
return receive_failed("could not open UDP socket");

auto bind_address = rak::socket_address::cast_from(manager->connection_manager()->bind_address());

if (bind_address->is_bindable() && !get_fd().bind(*bind_address))
return receive_failed("failed to bind socket to udp address '" + bind_address->pretty_address_str() + "' with error '" + rak::error_number::current().c_str() + "'");

m_readBuffer = new ReadBuffer;
m_writeBuffer = new WriteBuffer;
Expand All @@ -169,7 +183,7 @@ TrackerUdp::close() {
if (!get_fd().is_valid())
return;

LT_LOG_TRACKER(DEBUG, "Tracker UDP request cancelled: state:%s url:%s.",
LT_LOG_TRACKER(DEBUG, "request cancelled (state:%s url:%s)",
option_as_string(OPTION_TRACKER_EVENT, m_latest_event), m_url.c_str());

close_directly();
Expand All @@ -180,7 +194,7 @@ TrackerUdp::disown() {
if (!get_fd().is_valid())
return;

LT_LOG_TRACKER(DEBUG, "Tracker UDP request disowned: state:%s url:%s.",
LT_LOG_TRACKER(DEBUG, "request disowned (state:%s url:%s)",
option_as_string(OPTION_TRACKER_EVENT, m_latest_event), m_url.c_str());

close_directly();
Expand Down Expand Up @@ -225,7 +239,7 @@ TrackerUdp::receive_timeout() {
throw internal_error("TrackerUdp::receive_timeout() called but m_taskTimeout is still scheduled.");

if (--m_tries == 0) {
receive_failed("Unable to connect to UDP tracker.");
receive_failed("unable to connect to UDP tracker");
} else {
priority_queue_insert(&taskScheduler, &m_taskTimeout, (cachedTime + rak::timer::from_seconds(m_parent->info()->udp_timeout())).round_seconds());

Expand All @@ -245,7 +259,7 @@ TrackerUdp::event_read() {
m_readBuffer->reset_position();
m_readBuffer->set_end(s);

LT_LOG_TRACKER_DUMP(DEBUG, (const char*)m_readBuffer->begin(), s, "Tracker UDP reply.", 0);
LT_LOG_TRACKER_DUMP(DEBUG, (const char*)m_readBuffer->begin(), s, "received reply", 0);

if (s < 4)
return;
Expand Down Expand Up @@ -291,10 +305,6 @@ TrackerUdp::event_write() {

int __UNUSED s = write_datagram(m_writeBuffer->begin(), m_writeBuffer->size_end(), &m_connectAddress);

// TODO: If send failed, retry shortly or do i call receive_failed?
// if (s != m_writeBuffer->size_end())
// ;

manager->poll()->remove_write(this);
}

Expand All @@ -310,7 +320,7 @@ TrackerUdp::prepare_connect_input() {
m_writeBuffer->write_32(m_transactionId = random());

LT_LOG_TRACKER_DUMP(DEBUG, m_writeBuffer->begin(), m_writeBuffer->size_end(),
"Tracker UDP connect: id:%" PRIx32 ".", m_transactionId);
"prepare connect (id:%" PRIx32 ")", m_transactionId);
}

void
Expand Down Expand Up @@ -351,7 +361,7 @@ TrackerUdp::prepare_announce_input() {
throw internal_error("TrackerUdp::prepare_announce_input() ended up with the wrong size");

LT_LOG_TRACKER_DUMP(DEBUG, m_writeBuffer->begin(), m_writeBuffer->size_end(),
"Tracker UDP announce: state:%s id:%" PRIx32 " up_adj:%" PRIu64 " completed_adj:%" PRIu64 " left_adj:%" PRIu64 ".",
"prepare announce (state:%s id:%" PRIx32 " up_adj:%" PRIu64 " completed_adj:%" PRIu64 " left_adj:%" PRIu64 ")",
option_as_string(OPTION_TRACKER_EVENT, m_sendState),
m_transactionId, uploaded_adjusted, completed_adjusted, download_left);
}
Expand Down Expand Up @@ -399,7 +409,7 @@ TrackerUdp::process_error_output() {
m_readBuffer->read_32() != m_transactionId)
return false;

receive_failed("Received error message: " + std::string(m_readBuffer->position(), m_readBuffer->end()));
receive_failed("received error message: " + std::string(m_readBuffer->position(), m_readBuffer->end()));
return true;
}

Expand Down
6 changes: 6 additions & 0 deletions src/tracker/tracker_udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#ifndef LIBTORRENT_TRACKER_TRACKER_UDP_H
#define LIBTORRENT_TRACKER_TRACKER_UDP_H

#include <array>
#include <rak/socket_address.h>

#include "net/protocol_buffer.h"
Expand All @@ -50,6 +51,8 @@ namespace torrent {

class TrackerUdp : public SocketDatagram, public Tracker {
public:
typedef std::array<char, 1024> hostname_type;

typedef ProtocolBuffer<512> ReadBuffer;
typedef ProtocolBuffer<512> WriteBuffer;

Expand Down Expand Up @@ -90,6 +93,9 @@ class TrackerUdp : public SocketDatagram, public Tracker {
bool process_announce_output();
bool process_error_output();

bool parse_udp_url(const std::string& url, hostname_type& hostname, int& port) const;
resolver_type* make_resolver_slot(const hostname_type& hostname);

rak::socket_address m_connectAddress;
int m_port;

Expand Down