Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming #76

Merged
merged 11 commits into from
Aug 4, 2017
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ addons:
matrix:
include:
- {os: linux, rust: stable,
env: FLAGS=""}
env: FLAGS="--features tokio"}
- {os: linux, rust: beta,
env: FLAGS=""}
env: FLAGS="--features tokio"}
- {os: linux, rust: nightly,
env: FLAGS="--features clippy"}
env: FLAGS="--features clippy,tokio"}
- {os: osx, rust: stable,
env: FLAGS="--features full" PCAP_LIBDIR=/usr/local/opt/libpcap/lib}
- {os: osx, rust: beta,
Expand All @@ -23,6 +23,6 @@ before_install:
brew install homebrew/dupes/libpcap;
fi
script:
- cargo build -v $FLAGS
- cargo test -v $FLAGS
- cargo build -v $FLAGS
- cargo test -v $FLAGS
- cargo doc --no-deps
15 changes: 13 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ build = "build.rs"

[dependencies]
libc = "0.2"
clippy = { version = "0.0.134", optional = true }
clippy = { version = "*", optional = true, git="https://github.com/kraai/rust-clippy", branch="fix-build" }
mio = { version = "*", optional = true }
tokio-core = { version = "*", optional = true }
futures = { version = "*", optional = true }

[dev-dependencies]
tempdir = "0.3"
Expand All @@ -29,8 +32,12 @@ pcap-savefile-append = []
# This is disabled by default, because it requires libpcap >= 1.5.0.
pcap-fopen-offline-precision = []

# This feature enables access to the function Capture::stream.
# This is disabled by default, because it depends on a tokio and mio
tokio = ["mio", "tokio-core", "futures"]

# A shortcut to enable all features.
full = ["pcap-savefile-append", "pcap-fopen-offline-precision"]
full = ["pcap-savefile-append", "pcap-fopen-offline-precision", "tokio"]

[lib]
name = "pcap"
Expand All @@ -54,3 +61,7 @@ path = "examples/savefile.rs"
[[example]]
name = "getstatistics"
path = "examples/getstatistics.rs"

[[example]]
name = "streamlisten"
path = "examples/streamlisten.rs"
39 changes: 39 additions & 0 deletions examples/streamlisten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
extern crate pcap;
extern crate futures;
extern crate tokio_core;

use pcap::{Capture, Packet, Error, Device};
use pcap::tokio::PacketCodec;
use tokio_core::reactor::Core;
use futures::stream::Stream;

pub struct SimpleDumpCodec;

impl PacketCodec for SimpleDumpCodec{
type Type = String;

fn decode<'p>(&mut self, packet: Packet<'p>) -> Result<Self::Type, Error> {
Ok(format!("{:?}", packet))

}
}

fn ma1n() -> Result<(),Error> {
let mut core = Core::new().unwrap();
let handle = core.handle();
let cap = Capture::from_device(Device::lookup()?)?.open()?.setnonblock()?;
let s = cap.stream(&handle, SimpleDumpCodec{})?;
let done = s.for_each(move |s| {
println!("{:?}", s);
Ok(())
});
core.run(done).unwrap();
Ok(())
}

fn main() {
match ma1n() {
Ok(()) => (),
Err(e) => println!("{:?}", e),
}
}
89 changes: 82 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
#![cfg_attr(feature = "clippy", allow(redundant_closure_call))]

extern crate libc;
#[cfg(feature = "tokio")]
extern crate mio;
#[cfg(feature = "tokio")]
extern crate futures;
#[cfg(feature = "tokio")]
extern crate tokio_core;

use unique::Unique;

Expand All @@ -63,25 +69,31 @@ use std::slice;
use std::ops::Deref;
use std::mem;
use std::fmt;
#[cfg(feature = "tokio")]
use std::io;
#[cfg(not(windows))]
use std::os::unix::io::{RawFd, AsRawFd};

use self::Error::*;

mod raw;
mod unique;
#[cfg(feature = "tokio")]
pub mod tokio;

/// An error received from pcap
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, PartialEq)]
pub enum Error {
MalformedError(std::str::Utf8Error),
InvalidString,
PcapError(String),
InvalidLinktype,
TimeoutExpired,
NoMorePackets,
NonNonBlock,
InsufficientMemory,
InvalidInputString,
IoError(std::io::ErrorKind),
#[cfg(not(windows))]
InvalidRawFd,
}
Expand All @@ -103,9 +115,11 @@ impl fmt::Display for Error {
PcapError(ref e) => write!(f, "libpcap error: {}", e),
InvalidLinktype => write!(f, "invalid or unknown linktype"),
TimeoutExpired => write!(f, "timeout expired while reading from a live capture"),
NonNonBlock => write!(f, "must be in non-blocking mode to function"),
NoMorePackets => write!(f, "no more packets to read from the file"),
InsufficientMemory => write!(f, "insufficient memory"),
InvalidInputString => write!(f, "invalid input string (internal null)"),
IoError(ref e) => write!(f, "io error occurred: {:?}", e),
#[cfg(not(windows))]
InvalidRawFd => write!(f, "invalid raw file descriptor provided"),
}
Expand All @@ -120,9 +134,11 @@ impl std::error::Error for Error {
InvalidString => "libpcap returned a null string",
InvalidLinktype => "invalid or unknown linktype",
TimeoutExpired => "timeout expired while reading from a live capture",
NonNonBlock => "must be in non-blocking mode to function",
NoMorePackets => "no more packets to read from the file",
InsufficientMemory => "insufficient memory",
InvalidInputString => "invalid input string (internal null)",
IoError(..) => "io error occurred",
#[cfg(not(windows))]
InvalidRawFd => "invalid raw file descriptor provided",
}
Expand All @@ -148,6 +164,18 @@ impl From<std::str::Utf8Error> for Error {
}
}

impl From<std::io::Error> for Error {
fn from(obj: std::io::Error) -> Error {
IoError(obj.kind())
}
}

impl From<std::io::ErrorKind> for Error {
fn from(obj: std::io::ErrorKind) -> Error {
IoError(obj)
}
}

#[derive(Debug)]
/// A network device name and (potentially) pcap's description of it.
pub struct Device {
Expand Down Expand Up @@ -280,7 +308,7 @@ impl fmt::Debug for PacketHeader {
impl PartialEq for PacketHeader {
fn eq(&self, rhs: &PacketHeader) -> bool {
self.ts.tv_sec == rhs.ts.tv_sec && self.ts.tv_usec == rhs.ts.tv_usec &&
self.caplen == rhs.caplen && self.len == rhs.len
self.caplen == rhs.caplen && self.len == rhs.len
}
}

Expand Down Expand Up @@ -312,11 +340,14 @@ pub enum Precision {

/// Phantom type representing an inactive capture handle.
pub enum Inactive {}

/// Phantom type representing an active capture handle.
pub enum Active {}

/// Phantom type representing an offline capture handle, from a pcap dump file.
/// Implements `Activated` because it behaves nearly the same as a live handle.
pub enum Offline {}

/// Phantom type representing a dead capture handle. This can be use to create
/// new save files that are not generated from an active capture.
/// Implements `Activated` because it behaves nearly the same as a live handle.
Expand All @@ -325,7 +356,9 @@ pub enum Dead {}
pub unsafe trait Activated: State {}

unsafe impl Activated for Active {}

unsafe impl Activated for Offline {}

unsafe impl Activated for Dead {}

/// `Capture`s can be in different states at different times, and in these states they
Expand All @@ -335,8 +368,11 @@ unsafe impl Activated for Dead {}
pub unsafe trait State {}

unsafe impl State for Inactive {}

unsafe impl State for Active {}

unsafe impl State for Offline {}

unsafe impl State for Dead {}

/// This is a pcap capture handle which is an abstraction over the `pcap_t` provided by pcap.
Expand Down Expand Up @@ -370,15 +406,17 @@ unsafe impl State for Dead {}
/// println!("received packet! {:?}", packet);
/// }
/// ```
pub struct Capture<T: State + ?Sized> {
pub struct Capture<T: State + ? Sized> {
nonblock: bool,
handle: Unique<raw::pcap_t>,
_marker: PhantomData<T>,
}

impl<T: State + ?Sized> Capture<T> {
impl<T: State + ? Sized> Capture<T> {
fn new(handle: *mut raw::pcap_t) -> Capture<T> {
unsafe {
Capture {
nonblock: false,
handle: Unique::new(handle),
_marker: PhantomData,
}
Expand Down Expand Up @@ -537,7 +575,7 @@ impl Capture<Inactive> {
}

///# Activated captures include `Capture<Active>` and `Capture<Offline>`.
impl<T: Activated + ?Sized> Capture<T> {
impl<T: Activated + ? Sized> Capture<T> {
/// List the datalink types that this captured device supports.
pub fn list_datalinks(&self) -> Result<Vec<Linktype>, Error> {
unsafe {
Expand Down Expand Up @@ -637,6 +675,33 @@ impl<T: Activated + ?Sized> Capture<T> {
}
}

#[cfg(feature = "tokio")]
fn next_noblock<'a>(&'a mut self, fd: &mut tokio_core::reactor::PollEvented<tokio::SelectableFd>) -> Result<Packet<'a>, Error> {
if let futures::Async::NotReady = fd.poll_read() {
return Err(IoError(io::ErrorKind::WouldBlock))
} else {
return match self.next() {
Ok(p) => Ok(p),
Err(TimeoutExpired) => {
fd.need_read();
Err(IoError(io::ErrorKind::WouldBlock))
}
Err(e) => Err(e)
}
}
}

#[cfg(feature = "tokio")]
pub fn stream<C: tokio::PacketCodec>(self, handle: &tokio_core::reactor::Handle, codec: C) -> Result<tokio::PacketStream<T, C>, Error> {
if !self.nonblock {
return Err(NonNonBlock);
}
unsafe {
let fd = raw::pcap_get_selectable_fd(*self.handle);
tokio::PacketStream::new(self, fd, handle, codec)
}
}

/// Adds a filter to the capture using the given BPF program string. Internally
/// this is compiled using `pcap_compile()`.
///
Expand Down Expand Up @@ -670,6 +735,16 @@ impl Capture<Active> {
raw::pcap_sendpacket(*self.handle, buf.as_ptr() as _, buf.len() as _) == 0
})
}

pub fn setnonblock(mut self) -> Result<Capture<Active>, Error> {
with_errbuf(|err| unsafe {
if raw::pcap_setnonblock(*self.handle, 1, err) != 0 {
return Err(Error::new(err));
}
self.nonblock = true;
Ok(self)
})
}
}

impl Capture<Dead> {
Expand Down Expand Up @@ -697,7 +772,7 @@ impl AsRawFd for Capture<Active> {
}
}

impl<T: State + ?Sized> Drop for Capture<T> {
impl<T: State + ? Sized> Drop for Capture<T> {
fn drop(&mut self) {
unsafe { raw::pcap_close(*self.handle) }
}
Expand Down Expand Up @@ -747,7 +822,7 @@ fn cstr_to_string(ptr: *const libc::c_char) -> Result<Option<String>, Error> {
let string = if ptr.is_null() {
None
} else {
Some(unsafe {CStr::from_ptr(ptr as _)}.to_str()?.to_owned())
Some(unsafe { CStr::from_ptr(ptr as _) }.to_str()?.to_owned())
};
Ok(string)
}
Expand Down
4 changes: 2 additions & 2 deletions src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ extern "C" {
pub fn pcap_setfilter(arg1: *mut pcap_t, arg2: *mut bpf_program) -> c_int;
pub fn pcap_setdirection(arg1: *mut pcap_t, arg2: pcap_direction_t) -> c_int;
// pub fn pcap_getnonblock(arg1: *mut pcap_t, arg2: *mut c_char) -> c_int;
// pub fn pcap_setnonblock(arg1: *mut pcap_t, arg2: c_int, arg3: *mut c_char) -> c_int;
pub fn pcap_setnonblock(arg1: *mut pcap_t, arg2: c_int, arg3: *mut c_char) -> c_int;
pub fn pcap_sendpacket(arg1: *mut pcap_t, arg2: *const c_uchar, arg3: c_int) -> c_int;
// pub fn pcap_statustostr(arg1: c_int) -> *const c_char;
// pub fn pcap_strerror(arg1: c_int) -> *const c_char;
Expand Down Expand Up @@ -170,7 +170,7 @@ extern "C" {
// pub fn pcap_lib_version() -> *const c_char;
// pub fn bpf_image(arg1: *const bpf_insn, arg2: c_int) -> *mut c_char;
// pub fn bpf_dump(arg1: *const bpf_program, arg2: c_int) -> ();
// pub fn pcap_get_selectable_fd(arg1: *mut pcap_t) -> c_int;
pub fn pcap_get_selectable_fd(arg1: *mut pcap_t) -> c_int;
}

#[cfg(windows)]
Expand Down
Loading