Skip to content

Commit

Permalink
Merge pull request #134 from sopium/master
Browse files Browse the repository at this point in the history
feat: update to tokio 0.2 and futures 0.3
  • Loading branch information
hannobraun authored Nov 29, 2019
2 parents 0462e70 + 111c2df commit abd7936
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 135 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@ jobs:

steps:
- uses: actions/checkout@v1
- name: Install rust (1.39)
uses: actions-rs/toolchain@v1
with:
toolchain: 1.39.0
override: true
- name: Check rust and cargo version
run: rustc -V && cargo -V
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose
- name: Build (no default features)
run: cargo build --verbose --no-default-features
- name: Run tests (no default features)
run: cargo test --verbose --no-default-features
21 changes: 11 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ authors = [
"Cristian Kubis <[email protected]>",
"Frank Denis <[email protected]>"
]
edition = "2018"

description = "Idiomatic wrapper for inotify"
documentation = "https://docs.rs/inotify"
Expand All @@ -26,21 +27,21 @@ travis-ci = { repository = "inotify-rs/inotify" }

[features]
default = ["stream"]
stream = ["futures", "mio", "tokio", "tokio-io", "tokio-reactor"]
stream = ["futures-core", "mio", "tokio"]


[dependencies]
bitflags = "1"
futures = { version = "0.1", optional = true }
inotify-sys = "0.1.3"
libc = "0.2"
mio = { version = "0.6", optional = true }
tokio = { version = "0.1", optional = true }
tokio-io = { version = "0.1", optional = true }
tokio-reactor = { version = "0.1", optional = true }
bitflags = "1"
futures-core = { version = "0.3.1", optional = true }
inotify-sys = "0.1.3"
libc = "0.2"
mio = { version = "0.6", optional = true }
tokio = { version = "0.2.1", optional = true, features = ["io-driver"] }

[dev-dependencies]
tempdir = "0.3"
tempdir = "0.3"
futures-util = "0.3.1"
tokio = { version = "0.2.1", features = ["macros", "rt-core"] }

[[example]]
name = "stream"
Expand Down
17 changes: 6 additions & 11 deletions examples/stream.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
extern crate futures;
extern crate inotify;
extern crate tempdir;


use std::{
fs::File,
io,
thread,
time::Duration,
};

use futures::Stream;
use futures_util::StreamExt;
use inotify::{
Inotify,
WatchMask,
};
use tempdir::TempDir;


fn main() -> Result<(), io::Error> {
#[tokio::main]
async fn main() -> Result<(), io::Error> {
let mut inotify = Inotify::init()
.expect("Failed to initialize inotify");

Expand All @@ -34,10 +29,10 @@ fn main() -> Result<(), io::Error> {
});

let mut buffer = [0; 32];
let stream = inotify.event_stream(&mut buffer);
let mut stream = inotify.event_stream(&mut buffer)?;

for event in stream.wait() {
print!("event: {:?}\n", event);
while let Some(event_or_error) = stream.next().await {
println!("event: {:?}", event_or_error?);
}

Ok(())
Expand Down
3 changes: 0 additions & 3 deletions examples/watch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
extern crate inotify;


use std::env;

use inotify::{
Expand Down
7 changes: 4 additions & 3 deletions src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use std::{

use inotify_sys as ffi;

use fd_guard::FdGuard;
use watches::WatchDescriptor;
use crate::fd_guard::FdGuard;
use crate::watches::WatchDescriptor;


/// Iterator over inotify events
Expand Down Expand Up @@ -111,7 +111,7 @@ impl<'a> Event<&'a OsStr> {
let mask = EventMask::from_bits(event.mask)
.expect("Failed to convert event mask. This indicates a bug.");

let wd = ::WatchDescriptor {
let wd = crate::WatchDescriptor {
id: event.wd,
fd,
};
Expand Down Expand Up @@ -195,6 +195,7 @@ impl<'a> Event<&'a OsStr> {
(bytes_consumed, event)
}

#[cfg(feature = "stream")]
pub(crate) fn into_owned(&self) -> EventOwned {
Event {
wd: self.wd.clone(),
Expand Down
37 changes: 6 additions & 31 deletions src/inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,17 @@ use libc::{
fcntl,
};

use events::Events;
use fd_guard::FdGuard;
use util::read_into_buffer;
use watches::{
use crate::events::Events;
use crate::fd_guard::FdGuard;
use crate::util::read_into_buffer;
use crate::watches::{
WatchDescriptor,
WatchMask,
};

#[cfg(feature = "stream")]
use tokio_reactor::Handle;

#[cfg(feature = "stream")]
use stream::EventStream;
use crate::stream::EventStream;


/// Idiomatic Rust wrapper around Linux's inotify API
Expand Down Expand Up @@ -395,36 +393,13 @@ impl Inotify {
/// infinite source of events.
///
/// An internal buffer which can hold the largest possible event is used.
///
/// The event stream will be associated with the default reactor. See
/// [`Inotify::event_stream_with_handle`], if you need more control over the
/// reactor used.
///
/// [`Inotify::event_stream_with_handle`]: struct.Inotify.html#method.event_stream_with_handle
#[cfg(feature = "stream")]
pub fn event_stream<T>(&mut self, buffer: T)
-> EventStream<T>
where
T: AsMut<[u8]> + AsRef<[u8]>,
{
EventStream::new(self.fd.clone(), buffer)
}

/// Create a stream which collects events, associated with the given
/// reactor.
///
/// This functions identically to [`Inotify::event_stream`], except that
/// the returned stream will be associated with the given reactor, rather
/// than the default.
///
/// [`Inotify::event_stream`]: struct.Inotify.html#method.event_stream
#[cfg(feature = "stream")]
pub fn event_stream_with_handle<T>(&mut self, handle: &Handle, buffer: T)
-> io::Result<EventStream<T>>
where
T: AsMut<[u8]> + AsRef<[u8]>,
{
EventStream::new_with_handle(self.fd.clone(), handle, buffer)
EventStream::new(self.fd.clone(), buffer)
}

/// Closes the inotify instance
Expand Down
17 changes: 3 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,6 @@
#[macro_use]
extern crate bitflags;

extern crate libc;
extern crate inotify_sys;

#[cfg(feature = "stream")]
#[macro_use]
extern crate futures;

#[cfg(feature = "stream")]
extern crate tokio_reactor;


mod events;
mod fd_guard;
mod inotify;
Expand All @@ -94,14 +83,14 @@ mod watches;
mod stream;


pub use events::{
pub use crate::events::{
Event,
EventMask,
EventOwned,
Events,
};
pub use inotify::Inotify;
pub use watches::{
pub use crate::inotify::Inotify;
pub use crate::watches::{
WatchDescriptor,
WatchMask,
};
Expand Down
80 changes: 27 additions & 53 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,24 @@
extern crate mio;
extern crate tokio_io;


use std::{
io,
ops::Deref,
pin::Pin,
sync::Arc,
task::{Poll, Context},
};

use self::{
mio::{
event::Evented,
unix::EventedFd,
},
tokio_io::AsyncRead,
};
use futures::{
Async,
Poll,
Stream,
};
use tokio_reactor::{
Handle,
PollEvented,
use mio::{
event::Evented,
unix::EventedFd,
};
use tokio::io::{AsyncRead, PollEvented};
use futures_core::{Stream, ready};

use events::{
use crate::events::{
Event,
EventOwned,
};
use fd_guard::FdGuard;
use util::read_into_buffer;
use crate::fd_guard::FdGuard;
use crate::util::read_into_buffer;


/// Stream of inotify events
Expand All @@ -50,25 +38,9 @@ where
T: AsMut<[u8]> + AsRef<[u8]>,
{
/// Returns a new `EventStream` associated with the default reactor.
pub(crate) fn new(fd: Arc<FdGuard>, buffer: T) -> Self {
EventStream {
fd: PollEvented::new(EventedFdGuard(fd)),
buffer: buffer,
buffer_pos: 0,
unused_bytes: 0,
}
}

/// Returns a new `EventStream` associated with the specified reactor.
pub(crate) fn new_with_handle(
fd : Arc<FdGuard>,
handle: &Handle,
buffer: T,
)
-> io::Result<Self>
{
pub(crate) fn new(fd: Arc<FdGuard>, buffer: T) -> io::Result<Self> {
Ok(EventStream {
fd: PollEvented::new_with_handle(EventedFdGuard(fd), handle)?,
fd: PollEvented::new(EventedFdGuard(fd))?,
buffer: buffer,
buffer_pos: 0,
unused_bytes: 0,
Expand All @@ -80,34 +52,36 @@ impl<T> Stream for EventStream<T>
where
T: AsMut<[u8]> + AsRef<[u8]>,
{
type Item = EventOwned;
type Error = io::Error;
type Item = io::Result<EventOwned>;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
{
if self.unused_bytes == 0 {
// Safety: safe because we never move out of `self_`.
let self_ = unsafe { self.get_unchecked_mut() };

if self_.unused_bytes == 0 {
// Nothing usable in buffer. Need to reset and fill buffer.
self.buffer_pos = 0;
self.unused_bytes = try_ready!(self.fd.poll_read(&mut self.buffer.as_mut()));
self_.buffer_pos = 0;
self_.unused_bytes = ready!(Pin::new(&mut self_.fd).poll_read(cx, self_.buffer.as_mut()))?;
}

if self.unused_bytes == 0 {
if self_.unused_bytes == 0 {
// The previous read returned `0` signalling end-of-file. Let's
// signal end-of-stream to the caller.
return Ok(Async::Ready(None));
return Poll::Ready(None);
}

// We have bytes in the buffer. inotify doesn't put partial events in
// there, and we only take complete events out. That means we have at
// least one event in there and can call `from_buffer` to take it out.
let (bytes_consumed, event) = Event::from_buffer(
Arc::downgrade(self.fd.get_ref()),
&self.buffer.as_ref()[self.buffer_pos..],
Arc::downgrade(self_.fd.get_ref()),
&self_.buffer.as_ref()[self_.buffer_pos..],
);
self.buffer_pos += bytes_consumed;
self.unused_bytes -= bytes_consumed;
self_.buffer_pos += bytes_consumed;
self_.unused_bytes -= bytes_consumed;

Ok(Async::Ready(Some(event.into_owned())))
Poll::Ready(Some(Ok(event.into_owned())))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/watches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use inotify_sys as ffi;

use fd_guard::FdGuard;
use crate::fd_guard::FdGuard;


bitflags! {
Expand Down
Loading

0 comments on commit abd7936

Please sign in to comment.