Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add windows ReadDirectoryChanges support #39

Merged
merged 26 commits into from
Nov 29, 2015
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f24f3fe
WIP
maurizi Jun 7, 2015
79b8785
Windows: use SleepEx to allow completion routine to fire. Fix bad tr…
jmquigs Nov 17, 2015
d92ea4d
Merge branch 'master' of https://github.com/passcod/rsnotify into win…
jmquigs Nov 17, 2015
1592724
Windows: use full paths in reported events
jmquigs Nov 17, 2015
124652c
Windows: use try_recv instead of select!, process all actions in loop
jmquigs Nov 17, 2015
be214c8
Windows: fix watch_dir_recommended test by not panicking if channel i…
jmquigs Nov 17, 2015
1764e37
Windows: don't need to free overlapped/request in handler
jmquigs Nov 17, 2015
8d2167f
Windows: convert watch/unwatch send errors to Error::Generic
jmquigs Nov 17, 2015
12f0ae4
Windows: change test to give windows a little time to start watcher b…
jmquigs Nov 18, 2015
108e08c
Windows: allow write on the monitored directory
jmquigs Nov 18, 2015
549da9f
Windows: implement single file watch
jmquigs Nov 18, 2015
7e3ead6
Windows: implement test for single file watcher
jmquigs Nov 18, 2015
8f00aa1
Add appveyor file
jmquigs Nov 18, 2015
c536e64
appveyor: don't build the gnu targets
jmquigs Nov 18, 2015
34b9c75
Windows: fix some winapi imports
jmquigs Nov 19, 2015
672072a
Windows: remove needless return (clippy)
jmquigs Nov 19, 2015
6c658c6
Windows: wait in alertable sleep on shutdown
jmquigs Nov 19, 2015
3c309b7
Add me to contributors
jmquigs Nov 19, 2015
e6434a4
Windows: add a failing test that shows the shutdown leak
jmquigs Nov 19, 2015
02bca44
Windows: fix shutdown leak
jmquigs Nov 19, 2015
7dc1f55
Windows: increase watcher test wait time to 50ms
jmquigs Nov 19, 2015
ad5e274
Windows: eliminate warning in test
jmquigs Nov 20, 2015
99a6fde
Windows: replace transmute with Box::from_raw
jmquigs Nov 20, 2015
506de31
Merge branch 'win-shutdown-leak' into windows2
jmquigs Nov 21, 2015
e717093
Windows: wait for reply in watch()
jmquigs Nov 21, 2015
50af1c6
Windows: fix atom indent fail
jmquigs Nov 21, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ authors = [
"Antti Keränen <[email protected]>",
"Gilbert Röhrbein <[email protected]>",
"Jorge Israel Peña <[email protected]>",
"Michael Maurizi <mmaurizi@azavea.com>",
"Michael Maurizi <michael.maurizi@gmail.com>",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add yourself in there, too :)

"Pierre Baillet <[email protected]>",
"ShuYu Wang <[email protected]>",
"Jimmy Lu <[email protected]>",
"Francisco Giordano <[email protected]>",
"Jake Kerr",
"Ty Overby <[email protected]>",
"John Quigley <[email protected]>"
]

description = "Cross-platform filesystem notification library"
Expand Down Expand Up @@ -45,6 +46,18 @@ version = "^0.2.11"
[target.x86_64-apple-darwin.dependencies.fsevent-sys]
version = "^0.1"

[target.i686-pc-windows-gnu]
dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" }

[target.x86_64-pc-windows-gnu]
dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" }

[target.i686-pc-windows-msvc]
dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" }

[target.x86_64-pc-windows-msvc]
dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" }

[dev-dependencies]
tempfile = "^1.1.0"
tempdir = "^0.3.4"
19 changes: 19 additions & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
environment:
matrix:
- TARGET: x86_64-pc-windows-msvc
- TARGET: i686-pc-windows-msvc
# skip the gnu targets to reduce build time
#- TARGET: x86_64-pc-windows-gnu
#- TARGET: i686-pc-windows-gnu
install:
- ps: Start-FileDownload "https://static.rust-lang.org/dist/rust-nightly-${env:TARGET}.exe"
- rust-nightly-%TARGET%.exe /VERYSILENT /NORESTART /DIR="C:\Program Files (x86)\Rust"
- SET PATH=%PATH%;C:\Program Files (x86)\Rust\bin
- SET PATH=%PATH%;C:\MinGW\bin
- rustc -V
- cargo -V

build: false

test_script:
- cargo test --verbose
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a badge to the readme for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add the badge, but I think it will be broken until you set up the project there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just added it, so it should work :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the CI build, but there is a project ID that I can't see that is needed to add the badge. So I guess you'll have to add it :( Also if you like, you can remove #4 from the todo list on the readme.
http://www.appveyor.com/docs/status-badges

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do :)

6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[macro_use] extern crate log;
#[macro_use] extern crate bitflags;
#[cfg(target_os="macos")] extern crate fsevent_sys;
#[cfg(target_os="windows")] extern crate winapi;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this both here and in windows.rs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the one from windows.rs

extern crate libc;
extern crate filetime;

Expand All @@ -12,11 +13,13 @@ use std::convert::AsRef;

#[cfg(target_os="macos")] pub use self::fsevent::FsEventWatcher;
#[cfg(target_os="linux")] pub use self::inotify::INotifyWatcher;
#[cfg(target_os="windows")] pub use self::windows::ReadDirectoryChangesWatcher;
pub use self::null::NullWatcher;
pub use self::poll::PollWatcher;

#[cfg(target_os="linux")] pub mod inotify;
#[cfg(target_os="macos")] pub mod fsevent;
#[cfg(target_os="windows")] pub mod windows;
pub mod null;
pub mod poll;

Expand Down Expand Up @@ -57,7 +60,8 @@ pub trait Watcher: Sized {

#[cfg(target_os = "linux")] pub type RecommendedWatcher = INotifyWatcher;
#[cfg(target_os = "macos")] pub type RecommendedWatcher = FsEventWatcher;
#[cfg(not(any(target_os = "linux", target_os = "macos")))] pub type RecommendedWatcher = PollWatcher;
#[cfg(target_os = "windows")] pub type RecommendedWatcher = ReadDirectoryChangesWatcher;
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] pub type RecommendedWatcher = PollWatcher;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is becoming a bit cumbersome... if anyone has an idea to make it so that we don't have to add to the attribute each time, please have a go :)


pub fn new(tx: Sender<Event>) -> Result<RecommendedWatcher, Error> {
Watcher::new(tx)
Expand Down
307 changes: 307 additions & 0 deletions src/windows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
extern crate kernel32;

use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE,
ERROR_OPERATION_ABORTED, FILE_NOTIFY_INFORMATION, fileapi, winbase, winnt};

use std::collections::HashMap;
use std::mem;
use std::path::{Path, PathBuf};
use std::ptr;
use std::slice;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::ffi::OsString;
use std::os::windows::ffi::{OsStrExt, OsStringExt};
use std::thread;
use std::os::raw::c_void;

use super::{Event, Error, op, Op, Watcher};

const BUF_SIZE: u32 = 16384;

#[derive(Debug,Clone)]
struct ReadData {
dir: PathBuf, // directory that is being watched
file: Option<PathBuf>, // if a file is being watched, this is its full path
}

struct ReadDirectoryRequest {
tx: Sender<Event>,
buffer: [u8; BUF_SIZE as usize],
handle: HANDLE,
data: ReadData
}

enum Action {
Watch(PathBuf),
Unwatch(PathBuf),
Stop
}

struct ReadDirectoryChangesServer {
rx: Receiver<Action>,
tx: Sender<Event>,
watches: HashMap<PathBuf, HANDLE>
}

impl ReadDirectoryChangesServer {
fn start(event_tx: Sender<Event>) -> Sender<Action> {
let (action_tx, action_rx) = channel();
thread::spawn(move || {
let server = ReadDirectoryChangesServer {
tx: event_tx,
rx: action_rx,
watches: HashMap::new()
};
server.run();
});
action_tx
}

fn run(mut self) {
loop {
// process all available actions first
let mut stopped = false;

while let Ok(action) = self.rx.try_recv() {
match action {
Action::Watch(path) => self.add_watch(path),
Action::Unwatch(path) => self.remove_watch(path),
Action::Stop => {
stopped = true;
for (_, handle) in &self.watches {
unsafe {
close_handle(*handle);
}
}
// wait for final read callback. required to avoid leaking callback
// memory
unsafe {
kernel32::SleepEx(500, 1);
}
break;
}
}
};

if stopped {
break;
}

// call sleepex with alertable flag so that our completion routine fires
unsafe {
kernel32::SleepEx(500, 1);
}
}
}

fn add_watch(&mut self, path: PathBuf) {
// path must exist and be either a file or directory
if !path.is_dir() && !path.is_file() {
let _ = self.tx.send(Event {
path: Some(path),
op: Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned()))
});
return;
}

let (watching_file,dir_target) = {
if path.is_dir() {
(false,path.clone())
} else {
// emulate file watching by watching the parent directory
(true,path.parent().unwrap().to_path_buf())
}
};

let encoded_path: Vec<u16> = dir_target.as_os_str().encode_wide().chain(Some(0)).collect();
let handle;
unsafe {
handle = kernel32::CreateFileW(
encoded_path.as_ptr(),
winnt::FILE_LIST_DIRECTORY,
winnt::FILE_SHARE_READ | winnt::FILE_SHARE_DELETE | winnt::FILE_SHARE_WRITE,
ptr::null_mut(),
fileapi::OPEN_EXISTING,
winbase::FILE_FLAG_BACKUP_SEMANTICS | winbase::FILE_FLAG_OVERLAPPED,
ptr::null_mut());

if handle == INVALID_HANDLE_VALUE {
let err = if watching_file {
Err(Error::Generic("You attempted to watch a single file, but parent directory could not be opened.".to_owned()))
} else {
// TODO: Call GetLastError for better error info?
Err(Error::PathNotFound)
};
let _ = self.tx.send(Event {
path: None,
op: err
});
return;
}
}
let wf = if watching_file {
Some(path.clone())
} else {
None
};
let rd = ReadData {
dir: dir_target,
file: wf
};
self.watches.insert(path.clone(), handle);
start_read(&rd, &self.tx, handle);
}

fn remove_watch(&mut self, path: PathBuf) {
if let Some(handle) = self.watches.remove(&path) {
unsafe {
close_handle(handle);
}
}
}
}

unsafe fn close_handle(handle: HANDLE) {
// TODO: Handle errors
kernel32::CancelIo(handle);
kernel32::CloseHandle(handle);
}

fn start_read(rd: &ReadData, tx: &Sender<Event>, handle: HANDLE) {
let mut request = Box::new(ReadDirectoryRequest {
tx: tx.clone(),
handle: handle,
buffer: [0u8; BUF_SIZE as usize],
data: rd.clone()
});

let flags = winnt::FILE_NOTIFY_CHANGE_FILE_NAME
| winnt::FILE_NOTIFY_CHANGE_DIR_NAME
| winnt::FILE_NOTIFY_CHANGE_ATTRIBUTES
| winnt::FILE_NOTIFY_CHANGE_SIZE
| winnt::FILE_NOTIFY_CHANGE_LAST_WRITE
| winnt::FILE_NOTIFY_CHANGE_CREATION
| winnt::FILE_NOTIFY_CHANGE_SECURITY;

let monitor_subdir = if (&request.data.file).is_none() {
1
} else {
0
};

unsafe {
let mut overlapped: Box<OVERLAPPED> = Box::new(mem::zeroed());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only time these get deallocated is when handling an event. When you shut down the thread you don't wait for the final set of callbacks to occur so you can cleanup the stuff, thus leaking any of these that were still in progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can wait for the events on shutdown, but I suppose there is no way to check the handle to see if it is still valid in the completion routine without issuing another read call. In that case, I guess I have to pass through some state flag to tell the routine that the server is shutdown and it shouldn't try any more reads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I get ERROR_OPERATION_ABORTED on the final read. I can key off that...

// When using callback based async requests, we are allowed to use the hEvent member
// for our own purposes

let req_buf = request.buffer.as_mut_ptr() as *mut c_void;
let request_p: *mut c_void = mem::transmute(request);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be Box::into_raw

overlapped.hEvent = request_p;

// This is using an asynchronous call with a completion routine for receiving notifications
// An I/O completion port would probably be more performant
kernel32::ReadDirectoryChangesW(
handle,
req_buf,
BUF_SIZE,
monitor_subdir,
flags,
&mut 0u32 as *mut u32, // This parameter is not used for async requests
&mut *overlapped as *mut OVERLAPPED,
Some(handle_event));

mem::forget(overlapped);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should pass Box::into_raw(overlapped) directly to ReadDirectoryChangesW instead of doing this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do that; but in my other branch I check the return value here, because It can fail. In the fail case you have to re-box the memory to get a drop:
https://github.com/jmquigs/rsnotify/blob/7dc1f555ff065154eb9e57134631e52bb958c6dc/src/windows.rs#L231
...maybe I should just include those changes here

}
}

unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED) {
let overlapped: Box<OVERLAPPED> = Box::from_raw(overlapped);
let request: Box<ReadDirectoryRequest> = Box::from_raw(overlapped.hEvent as *mut _);

if error_code == ERROR_OPERATION_ABORTED {
// received when dir is unwatched or watcher is shutdown; return and let overlapped/request
// get drop-cleaned
return;
}

// Get the next request queued up as soon as possible
start_read(&request.data, &request.tx, request.handle);

// The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length string
// as its last member. Each struct contains an offset for getting the next entry in the buffer
let mut cur_offset: *const u8 = request.buffer.as_ptr();
let mut cur_entry: *const FILE_NOTIFY_INFORMATION = mem::transmute(cur_offset);
loop {
// filename length is size in bytes, so / 2
let len = (*cur_entry).FileNameLength as usize / 2;
let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), len);
// prepend root to get a full path
let path = request.data.dir.join(PathBuf::from(OsString::from_wide(encoded_path)));

// if we are watching a single file, ignore the event unless the path is exactly
// the watched file
let skip = match request.data.file {
None => false,
Some(ref watch_path) => *watch_path != path
};

if !skip {
let op = match (*cur_entry).Action {
winnt::FILE_ACTION_ADDED => op::CREATE,
winnt::FILE_ACTION_REMOVED => op::REMOVE,
winnt::FILE_ACTION_MODIFIED => op::WRITE,
winnt::FILE_ACTION_RENAMED_OLD_NAME | winnt::FILE_ACTION_RENAMED_NEW_NAME => op::RENAME,
_ => Op::empty()
};


let evt = Event {
path: Some(path),
op: Ok(op)
};
let _ = request.tx.send(evt);
}

if (*cur_entry).NextEntryOffset == 0 {
break;
}
cur_offset = cur_offset.offset((*cur_entry).NextEntryOffset as isize);
cur_entry = mem::transmute(cur_offset);
}
}

pub struct ReadDirectoryChangesWatcher {
tx: Sender<Action>
}

impl Watcher for ReadDirectoryChangesWatcher {
fn new(event_tx: Sender<Event>) -> Result<ReadDirectoryChangesWatcher, Error> {
let action_tx = ReadDirectoryChangesServer::start(event_tx);

Ok(ReadDirectoryChangesWatcher {
tx: action_tx
})
}

fn watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
// path must exist and be either a file or directory
let pb = path.as_ref().to_path_buf();
if !pb.is_dir() && !pb.is_file() {
return Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned()));
}

self.tx.send(Action::Watch(path.as_ref().to_path_buf()))
.map_err(|_| Error::Generic("Error sending to internal channel".to_owned()))
}

fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
self.tx.send(Action::Unwatch(path.as_ref().to_path_buf()))
.map_err(|_| Error::Generic("Error sending to internal channel".to_owned()))
}
}

impl Drop for ReadDirectoryChangesWatcher {
fn drop(&mut self) {
let _ = self.tx.send(Action::Stop);
}
}
Loading