Skip to content
This repository was archived by the owner on Jun 27, 2022. It is now read-only.

Add read timeout to socket. #8

Merged
merged 14 commits into from
Jun 22, 2015
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ log = "0.3"
env_logger = "0.3"
rand = "0.3"

[dependencies.with_read_timeout]
path = "with_read_timeout"
version = "*"

[dev-dependencies]
quickcheck = "*"

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ extern crate time;
extern crate num;
#[macro_use] extern crate log;
#[cfg(test)] extern crate quickcheck;
extern crate with_read_timeout;

// Public API
pub use socket::UtpSocket;
Expand Down
147 changes: 82 additions & 65 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::io::{Result, Error, ErrorKind};
use util::{now_microseconds, ewma};
use packet::{Packet, PacketType, Encodable, Decodable, ExtensionType, HEADER_SIZE};
use rand;
use with_read_timeout::WithReadTimeout;

// For simplicity's sake, let us assume no packet will ever exceed the
// Ethernet maximum transfer unit of 1500 bytes.
Expand Down Expand Up @@ -234,7 +235,7 @@ impl UtpSocket {
let mut len = 0;
let mut buf = [0; BUF_SIZE];

// let syn_timeout = socket.congestion_timeout;
let mut syn_timeout = socket.congestion_timeout as i64;
for _ in (0u8..5) {
packet.set_timestamp_microseconds(now_microseconds());

Expand All @@ -246,14 +247,15 @@ impl UtpSocket {

// Validate response
// socket.socket.set_read_timeout(Some(syn_timeout));
match socket.socket.recv_from(&mut buf) {
match socket.socket.recv_timeout(&mut buf, syn_timeout) {
// Ok((_read, src)) if src != socket.connected_to => continue,
Ok((read, src)) => { socket.connected_to = src; len = read; break; },
// Err(ref e) if e.kind == TimedOut => {
// debug!("Timed out, retrying");
// syn_timeout *= 2;
// continue;
// },
Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
e.kind() == ErrorKind::TimedOut) => {
debug!("Timed out, retrying");
syn_timeout *= 2;
continue;
},
Err(e) => return Err(e),
};
}
Expand Down Expand Up @@ -336,18 +338,19 @@ impl UtpSocket {

fn recv(&mut self, buf: &mut[u8]) -> Result<(usize,SocketAddr)> {
let mut b = [0; BUF_SIZE + HEADER_SIZE];
// if self.state != SocketState::New {
// debug!("setting read timeout of {} ms", self.congestion_timeout);
// self.socket.set_read_timeout(Some(self.congestion_timeout));
// }
let (read, src) = match self.socket.recv_from(&mut b) {
// Err(ref e) if e.kind == TimedOut => {
// debug!("recv_from timed out");
// self.congestion_timeout = self.congestion_timeout * 2;
// self.cwnd = MSS;
// self.send_fast_resend_request();
// return Ok((0, self.connected_to));
// },
let timeout = if self.state != SocketState::New {
debug!("setting read timeout of {} ms", self.congestion_timeout);
self.congestion_timeout as i64
} else { 0 };
let (read, src) = match self.socket.recv_timeout(&mut b, timeout) {
Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
e.kind() == ErrorKind::TimedOut) => {
debug!("recv_from timed out");
self.congestion_timeout = self.congestion_timeout * 2;
self.cwnd = MSS;
self.send_fast_resend_request();
return Ok((0, self.connected_to));
},
Ok(x) => x,
Err(e) => return Err(e),
};
Expand Down Expand Up @@ -631,6 +634,25 @@ impl UtpSocket {
return sack;
}

/// Sends a fast resend request to the remote peer.
///
/// A fast resend request consists of sending three STATE packets (acknowledging the last
/// received packet) in quick succession.
fn send_fast_resend_request(&self) {
for _ in 0..3 {
let mut packet = Packet::new();
packet.set_type(PacketType::State);
let self_t_micro: u32 = now_microseconds();
let other_t_micro: u32 = 0;
packet.set_timestamp_microseconds(self_t_micro);
packet.set_timestamp_difference_microseconds((self_t_micro - other_t_micro));
packet.set_connection_id(self.sender_connection_id);
packet.set_seq_nr(self.seq_nr);
packet.set_ack_nr(self.ack_nr);
let _ = self.socket.send_to(&packet.to_bytes()[..], self.connected_to);
}
}

fn resend_lost_packet(&mut self, lost_packet_nr: u16) {
debug!("---> resend_lost_packet({}) <---", lost_packet_nr);
match self.send_window.iter().position(|pkt| pkt.seq_nr() == lost_packet_nr) {
Expand Down Expand Up @@ -1662,62 +1684,57 @@ mod test {
iotry!(server.recv_from(&mut buf));
}

// #[test]
// #[ignore]
// // `std::net::UdpSocket` no longer supports timeouts, so this test is deprecated for now.
// fn test_socket_timeout_request() {
// let (server_addr, client_addr) = (next_test_ip4().to_socket_addrs().unwrap().next().unwrap(),
// next_test_ip4().to_socket_addrs().unwrap().next().unwrap());
#[test]
fn test_socket_timeout_request() {
let (server_addr, client_addr) = (next_test_ip4().to_socket_addrs().unwrap().next().unwrap(),
next_test_ip4().to_socket_addrs().unwrap().next().unwrap());

// let client = iotry!(UtpSocket::bind(client_addr));
// let mut server = iotry!(UtpSocket::bind(server_addr));
// const LEN: usize = 512;
// let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
// let d = data.clone();
let client = iotry!(UtpSocket::bind(client_addr));
let mut server = iotry!(UtpSocket::bind(server_addr));
const LEN: usize = 512;
let data = (0..LEN).map(|idx| idx as u8).collect::<Vec<u8>>();
let d = data.clone();

// assert!(server.state == SocketState::New);
// assert!(client.state == SocketState::New);
assert!(server.state == SocketState::New);
assert!(client.state == SocketState::New);

// // Check proper difference in client's send connection id and receive connection id
// assert_eq!(client.sender_connection_id, client.receiver_connection_id + 1);
// Check proper difference in client's send connection id and receive connection id
assert_eq!(client.sender_connection_id, client.receiver_connection_id + 1);

// thread::spawn(move || {
// let mut client = iotry!(UtpSocket::connect(server_addr));
// assert!(client.state == SocketState::Connected);
// assert_eq!(client.connected_to, server_addr);
// iotry!(client.send_to(&d[..]));
// drop(client);
// });
thread::spawn(move || {
let mut client = iotry!(UtpSocket::connect(server_addr));
assert!(client.state == SocketState::Connected);
assert_eq!(client.connected_to, server_addr);
iotry!(client.send_to(&d[..]));
drop(client);
});

// let mut buf = [0u8; BUF_SIZE];
// match server.recv(&mut buf) {
// e => println!("{:?}", e),
// }
// // After establishing a new connection, the server's ids are a mirror of the client's.
// assert_eq!(server.receiver_connection_id, server.sender_connection_id + 1);
// assert_eq!(server.connected_to, client_addr);
let mut buf = [0u8; BUF_SIZE];
server.recv(&mut buf).unwrap();
// After establishing a new connection, the server's ids are a mirror of the client's.
assert_eq!(server.receiver_connection_id, server.sender_connection_id + 1);

// assert!(server.state == SocketState::Connected);
assert!(server.state == SocketState::Connected);

// // Purposefully read from UDP socket directly and discard it, in order
// // to behave as if the packet was lost and thus trigger the timeout
// // handling in the *next* call to `UtpSocket.recv_from`.
// iotry!(server.socket.recv_from(&mut buf));
// Purposefully read from UDP socket directly and discard it, in order
// to behave as if the packet was lost and thus trigger the timeout
// handling in the *next* call to `UtpSocket.recv_from`.
iotry!(server.socket.recv_from(&mut buf));

// // Set a much smaller than usual timeout, for quicker test completion
// server.congestion_timeout = 50;
// Set a much smaller than usual timeout, for quicker test completion
server.congestion_timeout = 50;

// // Now wait for the previously discarded packet
// loop {
// match server.recv_from(&mut buf) {
// Ok((0, _)) => continue,
// Ok(_) => break,
// Err(e) => panic!("{:?}", e),
// }
// }
// Now wait for the previously discarded packet
loop {
match server.recv_from(&mut buf) {
Ok((0, _)) => continue,
Ok(_) => break,
Err(e) => panic!("{}", e),
}
}

// drop(server);
// }
drop(server);
}

#[test]
fn test_sorted_buffer_insertion() {
Expand Down
22 changes: 22 additions & 0 deletions with_read_timeout/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "with_read_timeout"
version = "0.1.0"
authors = ["Ricardo Martins <[email protected]>"]

[target.x86_64-unknown-linux-gnu.dependencies]
nix = "*"

[target.i686-unknown-linux-gnu.dependencies]
nix = "*"

[target.x86_64-apple-darwin.dependencies]
nix = "*"

[target.i686-apple-darwin.dependencies]
nix = "*"

[target.i686-pc-windows-gnu.dependencies]
libc = "*"

[target.x86_64-pc-windows-gnu.dependencies]
libc = "*"
113 changes: 113 additions & 0 deletions with_read_timeout/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#[cfg(unix)]
extern crate nix;
#[cfg(windows)]
extern crate libc;

use std::io::{Error, ErrorKind, Result};
use std::net::{UdpSocket, SocketAddr};

/// A trait to make time-limited reads from socket-like objects.
pub trait WithReadTimeout {
/// Receives data from the object, blocking for at most the specified number of milliseconds.
/// On success, returns the number of bytes read and the address from whence the data came. If
/// the timeout expires, it returns `ErrorKind::WouldBlock`.
fn recv_timeout(&mut self, &mut [u8], i64) -> Result<(usize, SocketAddr)>;
}

impl WithReadTimeout for UdpSocket {
#[cfg(unix)]
fn recv_timeout(&mut self, buf: &mut [u8], timeout: i64) -> Result<(usize, SocketAddr)> {
use nix::sys::socket::{SockLevel, sockopt, setsockopt};
use nix::sys::time::TimeVal;
use std::os::unix::io::AsRawFd;

setsockopt(self.as_raw_fd(),
SockLevel::Socket,
sockopt::ReceiveTimeout,
&TimeVal::milliseconds(timeout)).unwrap();

fn map_os_error(e: Error) -> Error {
// TODO: Replace with constant from libc
const EAGAIN: i32 = 35;

match e.raw_os_error() {
Some(EAGAIN) => Error::new(ErrorKind::WouldBlock, ""),
_ => e
}
}
self.recv_from(buf).map_err(map_os_error)
}

#[cfg(windows)]
fn recv_timeout(&mut self, buf: &mut [u8], timeout: i64) -> Result<(usize, SocketAddr)> {
use select::fd_set;
use std::os::windows::io::AsRawSocket;
use libc;

// Initialize relevant data structures
let mut readfds = fd_set::new();
let null = std::ptr::null_mut();

fd_set(&mut readfds, self.as_raw_socket());

// Set timeout
let mut tv = libc::timeval {
tv_sec: timeout as i32 / 1000,
tv_usec: (timeout as i32 % 1000) * 1000,
};

// In Windows, the first argument to `select` is ignored.
let retval = unsafe { select::select(0, &mut readfds, null, null, &mut tv) };
if retval == 0 {
return Err(Error::new(ErrorKind::TimedOut, "Time limit expired"));
} else if retval < 0 {
return Err(Error::last_os_error());
}

self.recv_from(buf)
}
}

// Most of the following was copied from 'rust/src/libstd/sys/windows/net.rs'
#[cfg(windows)]
mod select {
use libc;

pub const FD_SETSIZE: usize = 64;

#[repr(C)]
pub struct fd_set {
fd_count: libc::c_uint,
fd_array: [libc::SOCKET; FD_SETSIZE],
}

pub fn fd_set(set: &mut fd_set, s: libc::SOCKET) {
set.fd_array[set.fd_count as usize] = s;
set.fd_count += 1;
}

impl fd_set {
pub fn new() -> fd_set {
fd_set {
fd_count: 0,
fd_array: [0; FD_SETSIZE],
}
}
}

#[link(name = "ws2_32")]
extern "system" {
pub fn select(nfds: libc::c_int,
readfds: *mut fd_set,
writefds: *mut fd_set,
exceptfds: *mut fd_set,
timeout: *mut libc::timeval) -> libc::c_int;
}
}

#[test]
fn test_socket_timeout() {
let mut socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buf = [0; 10];
assert!(socket.recv_timeout(&mut buf, 100).is_err());
}