diff --git a/Cargo.toml b/Cargo.toml index 29a053ab..2d7a52a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,13 +6,14 @@ authors = [ "Antti Keränen ", "Gilbert Röhrbein ", "Jorge Israel Peña ", - "Michael Maurizi ", + "Michael Maurizi ", "Pierre Baillet ", "ShuYu Wang ", "Jimmy Lu ", "Francisco Giordano ", "Jake Kerr", "Ty Overby ", + "John Quigley " ] description = "Cross-platform filesystem notification library" @@ -45,6 +46,18 @@ version = "^0.2.11" [target.x86_64-apple-darwin.dependencies.fsevent-sys] version = "^0.1" +[target.i686-pc-windows-gnu] +dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" } + +[target.x86_64-pc-windows-gnu] +dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" } + +[target.i686-pc-windows-msvc] +dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" } + +[target.x86_64-pc-windows-msvc] +dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" } + [dev-dependencies] tempfile = "^1.1.0" tempdir = "^0.3.4" diff --git a/appveyor.yml b/appveyor.yml new file mode 100644 index 00000000..9de0be60 --- /dev/null +++ b/appveyor.yml @@ -0,0 +1,19 @@ +environment: + matrix: + - TARGET: x86_64-pc-windows-msvc + - TARGET: i686-pc-windows-msvc + # skip the gnu targets to reduce build time + #- TARGET: x86_64-pc-windows-gnu + #- TARGET: i686-pc-windows-gnu +install: + - ps: Start-FileDownload "https://static.rust-lang.org/dist/rust-nightly-${env:TARGET}.exe" + - rust-nightly-%TARGET%.exe /VERYSILENT /NORESTART /DIR="C:\Program Files (x86)\Rust" + - SET PATH=%PATH%;C:\Program Files (x86)\Rust\bin + - SET PATH=%PATH%;C:\MinGW\bin + - rustc -V + - cargo -V + +build: false + +test_script: + - cargo test --verbose diff --git a/src/lib.rs b/src/lib.rs index ea290ac8..7fd30605 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate log; #[macro_use] extern crate bitflags; #[cfg(target_os="macos")] extern crate fsevent_sys; +#[cfg(target_os="windows")] extern crate winapi; extern crate libc; extern crate filetime; @@ -12,11 +13,13 @@ use std::convert::AsRef; #[cfg(target_os="macos")] pub use self::fsevent::FsEventWatcher; #[cfg(target_os="linux")] pub use self::inotify::INotifyWatcher; +#[cfg(target_os="windows")] pub use self::windows::ReadDirectoryChangesWatcher; pub use self::null::NullWatcher; pub use self::poll::PollWatcher; #[cfg(target_os="linux")] pub mod inotify; #[cfg(target_os="macos")] pub mod fsevent; +#[cfg(target_os="windows")] pub mod windows; pub mod null; pub mod poll; @@ -57,7 +60,8 @@ pub trait Watcher: Sized { #[cfg(target_os = "linux")] pub type RecommendedWatcher = INotifyWatcher; #[cfg(target_os = "macos")] pub type RecommendedWatcher = FsEventWatcher; -#[cfg(not(any(target_os = "linux", target_os = "macos")))] pub type RecommendedWatcher = PollWatcher; +#[cfg(target_os = "windows")] pub type RecommendedWatcher = ReadDirectoryChangesWatcher; +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] pub type RecommendedWatcher = PollWatcher; pub fn new(tx: Sender) -> Result { Watcher::new(tx) diff --git a/src/windows.rs b/src/windows.rs new file mode 100644 index 00000000..3355cb24 --- /dev/null +++ b/src/windows.rs @@ -0,0 +1,410 @@ +extern crate kernel32; + +use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, INFINITE, TRUE, WAIT_OBJECT_0, + ERROR_OPERATION_ABORTED, FILE_NOTIFY_INFORMATION, fileapi, winbase, winnt}; + +use std::collections::HashMap; +use std::mem; +use std::path::{Path, PathBuf}; +use std::ptr; +use std::slice; +use std::sync::mpsc::{channel, Sender, Receiver}; +use std::ffi::OsString; +use std::os::windows::ffi::{OsStrExt, OsStringExt}; +use std::thread; +use std::os::raw::c_void; + +use super::{Event, Error, op, Op, Watcher}; + +const BUF_SIZE: u32 = 16384; + +#[derive(Clone)] +struct ReadData { + dir: PathBuf, // directory that is being watched + file: Option, // if a file is being watched, this is its full path + complete_sem: HANDLE, +} + +struct ReadDirectoryRequest { + tx: Sender, + buffer: [u8; BUF_SIZE as usize], + handle: HANDLE, + data: ReadData +} + +enum Action { + Watch(PathBuf), + Unwatch(PathBuf), + Stop +} + +pub enum MetaEvent { + SingleWatchComplete, + WatcherAwakened +} + +struct WatchState { + dir_handle: HANDLE, + complete_sem: HANDLE, +} + +struct ReadDirectoryChangesServer { + rx: Receiver, + tx: Sender, + meta_tx: Sender, + cmd_tx: Sender>, + watches: HashMap, + wakeup_sem: HANDLE +} + +impl ReadDirectoryChangesServer { + fn start(event_tx: Sender, meta_tx: Sender, cmd_tx:Sender>, wakeup_sem: HANDLE) -> Sender { + + let (action_tx, action_rx) = channel(); + // it is, in fact, ok to send the semaphore across threads + let sem_temp = wakeup_sem as u64; + thread::spawn(move || { + let wakeup_sem = sem_temp as HANDLE; + let server = ReadDirectoryChangesServer { + tx: event_tx, + rx: action_rx, + meta_tx: meta_tx, + cmd_tx: cmd_tx, + watches: HashMap::new(), + wakeup_sem: wakeup_sem + }; + server.run(); + }); + action_tx + } + + fn run(mut self) { + loop { + // process all available actions first + let mut stopped = false; + + while let Ok(action) = self.rx.try_recv() { + match action { + Action::Watch(path) => { + let res = self.add_watch(path); + let _ = self.cmd_tx.send(res); + }, + Action::Unwatch(path) => self.remove_watch(path), + Action::Stop => { + stopped = true; + for (_, ws) in &self.watches { + stop_watch(ws, &self.meta_tx); + } + break; + } + } + }; + + if stopped { + break; + } + + unsafe { + // wait with alertable flag so that the completion routine fires + let waitres = kernel32::WaitForSingleObjectEx(self.wakeup_sem, 500, TRUE); + if waitres == WAIT_OBJECT_0 { + let _ = self.meta_tx.send(MetaEvent::WatcherAwakened); + } + } + } + + // we have to clean this up, since the watcher may be long gone + unsafe { + kernel32::CloseHandle(self.wakeup_sem); + } + } + + fn add_watch(&mut self, path: PathBuf) -> Result { + // path must exist and be either a file or directory + if !path.is_dir() && !path.is_file() { + return Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned())); + } + + let (watching_file,dir_target) = { + if path.is_dir() { + (false,path.clone()) + } else { + // emulate file watching by watching the parent directory + (true,path.parent().unwrap().to_path_buf()) + } + }; + + let encoded_path: Vec = dir_target.as_os_str().encode_wide().chain(Some(0)).collect(); + let handle; + unsafe { + handle = kernel32::CreateFileW( + encoded_path.as_ptr(), + winnt::FILE_LIST_DIRECTORY, + winnt::FILE_SHARE_READ | winnt::FILE_SHARE_DELETE | winnt::FILE_SHARE_WRITE, + ptr::null_mut(), + fileapi::OPEN_EXISTING, + winbase::FILE_FLAG_BACKUP_SEMANTICS | winbase::FILE_FLAG_OVERLAPPED, + ptr::null_mut()); + + if handle == INVALID_HANDLE_VALUE { + let err = if watching_file { + Err(Error::Generic("You attempted to watch a single file, but parent directory could not be opened.".to_owned())) + } else { + // TODO: Call GetLastError for better error info? + Err(Error::PathNotFound) + }; + return err; + } + } + let wf = if watching_file { + Some(path.clone()) + } else { + None + }; + // every watcher gets its own semaphore to signal completion + let semaphore = unsafe { + kernel32::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) + }; + if semaphore == ptr::null_mut() || semaphore == INVALID_HANDLE_VALUE { + unsafe { kernel32::CloseHandle(handle); } + return Err(Error::Generic("Failed to create semaphore for watch.".to_owned())); + } + let rd = ReadData { + dir: dir_target, + file: wf, + complete_sem: semaphore + }; + let ws = WatchState { + dir_handle: handle, + complete_sem: semaphore + }; + self.watches.insert(path.clone(), ws); + start_read(&rd, &self.tx, handle); + Ok(path.to_path_buf()) + } + + fn remove_watch(&mut self, path: PathBuf) { + if let Some(ws) = self.watches.remove(&path) { + stop_watch(&ws, &self.meta_tx); + } + } +} + +fn stop_watch(ws:&WatchState,meta_tx: &Sender) { + unsafe { + let cio = kernel32::CancelIo(ws.dir_handle); + let ch = kernel32::CloseHandle(ws.dir_handle); + // have to wait for it, otherwise we leak the memory allocated for there read request + if cio != 0 && ch != 0 { + kernel32::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE); + } + kernel32::CloseHandle(ws.complete_sem); + + } + let _ = meta_tx.send(MetaEvent::SingleWatchComplete); +} + +fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { + let mut request = Box::new(ReadDirectoryRequest { + tx: tx.clone(), + handle: handle, + buffer: [0u8; BUF_SIZE as usize], + data: rd.clone() + }); + + let flags = winnt::FILE_NOTIFY_CHANGE_FILE_NAME + | winnt::FILE_NOTIFY_CHANGE_DIR_NAME + | winnt::FILE_NOTIFY_CHANGE_ATTRIBUTES + | winnt::FILE_NOTIFY_CHANGE_SIZE + | winnt::FILE_NOTIFY_CHANGE_LAST_WRITE + | winnt::FILE_NOTIFY_CHANGE_CREATION + | winnt::FILE_NOTIFY_CHANGE_SECURITY; + + let monitor_subdir = if (&request.data.file).is_none() { + 1 + } else { + 0 + }; + + unsafe { + let mut overlapped: Box = Box::new(mem::zeroed()); + // When using callback based async requests, we are allowed to use the hEvent member + // for our own purposes + + let req_buf = request.buffer.as_mut_ptr() as *mut c_void; + let request_p = Box::into_raw(request) as *mut c_void; + overlapped.hEvent = request_p; + + // This is using an asynchronous call with a completion routine for receiving notifications + // An I/O completion port would probably be more performant + let ret = kernel32::ReadDirectoryChangesW( + handle, + req_buf, + BUF_SIZE, + monitor_subdir, + flags, + &mut 0u32 as *mut u32, // This parameter is not used for async requests + &mut *overlapped as *mut OVERLAPPED, + Some(handle_event)); + + if ret == 0 { + // error reading. retransmute request memory to allow drop. + // allow overlapped to drop by omitting forget() + let request: Box = mem::transmute(request_p); + + kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); + } else { + // read ok. forget overlapped to let the completion routine handle memory + mem::forget(overlapped); + } + } +} + +unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED) { + let overlapped: Box = Box::from_raw(overlapped); + let request: Box = Box::from_raw(overlapped.hEvent as *mut _); + + if error_code == ERROR_OPERATION_ABORTED { + // received when dir is unwatched or watcher is shutdown; return and let overlapped/request + // get drop-cleaned + kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); + return; + } + + // Get the next request queued up as soon as possible + start_read(&request.data, &request.tx, request.handle); + + // The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length string + // as its last member. Each struct contains an offset for getting the next entry in the buffer + let mut cur_offset: *const u8 = request.buffer.as_ptr(); + let mut cur_entry: *const FILE_NOTIFY_INFORMATION = mem::transmute(cur_offset); + loop { + // filename length is size in bytes, so / 2 + let len = (*cur_entry).FileNameLength as usize / 2; + let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), len); + // prepend root to get a full path + let path = request.data.dir.join(PathBuf::from(OsString::from_wide(encoded_path))); + + // if we are watching a single file, ignore the event unless the path is exactly + // the watched file + let skip = match request.data.file { + None => false, + Some(ref watch_path) => *watch_path != path + }; + + if !skip { + let op = match (*cur_entry).Action { + winnt::FILE_ACTION_ADDED => op::CREATE, + winnt::FILE_ACTION_REMOVED => op::REMOVE, + winnt::FILE_ACTION_MODIFIED => op::WRITE, + winnt::FILE_ACTION_RENAMED_OLD_NAME | winnt::FILE_ACTION_RENAMED_NEW_NAME => op::RENAME, + _ => Op::empty() + }; + + + let evt = Event { + path: Some(path), + op: Ok(op) + }; + let _ = request.tx.send(evt); + } + + if (*cur_entry).NextEntryOffset == 0 { + break; + } + cur_offset = cur_offset.offset((*cur_entry).NextEntryOffset as isize); + cur_entry = mem::transmute(cur_offset); + } +} + +pub struct ReadDirectoryChangesWatcher { + tx: Sender, + cmd_rx: Receiver>, + wakeup_sem: HANDLE +} + +impl ReadDirectoryChangesWatcher { + pub fn create(event_tx: Sender, meta_tx: Sender) -> Result { + let (cmd_tx, cmd_rx) = channel(); + + let wakeup_sem = unsafe { + kernel32::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) + }; + if wakeup_sem == ptr::null_mut() || wakeup_sem == INVALID_HANDLE_VALUE { + return Err(Error::Generic("Failed to create wakeup semaphore.".to_owned())); + } + + let action_tx = ReadDirectoryChangesServer::start(event_tx,meta_tx,cmd_tx,wakeup_sem); + + Ok(ReadDirectoryChangesWatcher { + tx: action_tx, + cmd_rx: cmd_rx, + wakeup_sem: wakeup_sem + }) + } + + fn wakeup_server(&mut self) { + // breaks the server out of its wait state. right now this is really just an optimization, + // so that if you add a watch you don't block for 500ms in watch() while the + // server sleeps. + unsafe { kernel32::ReleaseSemaphore(self.wakeup_sem, 1, ptr::null_mut()); } + } + + fn send_action_require_ack(&mut self, action:Action, pb:&PathBuf) -> Result<(), Error> { + match self.tx.send(action) { + Err(_) => Err(Error::Generic("Error sending to internal channel".to_owned())), + Ok(_) => { + // wake 'em up, we don't want to wait around for the ack + self.wakeup_server(); + + match self.cmd_rx.recv() { + Err(_) => Err(Error::Generic("Error receiving from command channel".to_owned())), + Ok(ack_res) => { + match ack_res { + Err(e) => Err(Error::Generic(format!("Error in watcher: {:?}", e))), + Ok(ack_pb) => { + if pb.as_path() != ack_pb.as_path() { + Err(Error::Generic(format!("Expected ack for {:?} but got ack for {:?}", pb, ack_pb))) + } else { + Ok(()) + } + } + } + } + } + } + } + } +} + +impl Watcher for ReadDirectoryChangesWatcher { + fn new(event_tx: Sender) -> Result { + // create dummy channel for meta event + let (meta_tx, _) = channel(); + ReadDirectoryChangesWatcher::create(event_tx, meta_tx) + } + + fn watch>(&mut self, path: P) -> Result<(), Error> { + // path must exist and be either a file or directory + let pb = path.as_ref().to_path_buf(); + if !pb.is_dir() && !pb.is_file() { + return Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned())); + } + self.send_action_require_ack(Action::Watch(path.as_ref().to_path_buf()), &pb) + } + + fn unwatch>(&mut self, path: P) -> Result<(), Error> { + let res = self.tx.send(Action::Unwatch(path.as_ref().to_path_buf())) + .map_err(|_| Error::Generic("Error sending to internal channel".to_owned())); + self.wakeup_server(); + res + } +} + +impl Drop for ReadDirectoryChangesWatcher { + fn drop(&mut self) { + let _ = self.tx.send(Action::Stop); + // better wake it up + self.wakeup_server(); + } +} diff --git a/tests/notify.rs b/tests/notify.rs index e46d928b..8a47fb93 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -53,31 +53,77 @@ fn resolve_path(path: &Path) -> PathBuf { PathBuf::from(p) } -fn validate_recv(rx: Receiver, evs: Vec<(&Path, Op)>) { +fn validate_recv(rx: Receiver, evs: Vec<(&Path, Op)>) -> Vec { let deadline = time::precise_time_s() + TIMEOUT_S; let mut evs = evs.clone(); + let mut received_events:Vec = Vec::new(); + while time::precise_time_s() < deadline { if let Ok(actual) = rx.try_recv() { let path = actual.path.clone().unwrap(); - let op = actual.op.unwrap().clone(); - let mut removables = vec!(); - for i in (0..evs.len()) { - let expected = evs.get(i).unwrap(); - if path.clone().as_path() == expected.0 && op.contains(expected.1) { - removables.push(i); + match actual.op { + Err(e) => panic!("unexpected err: {:?}", e), + Ok(op) => { + let mut removables = vec!(); + for i in (0..evs.len()) { + let expected = evs.get(i).unwrap(); + if path.clone().as_path() == expected.0 && op.contains(expected.1) { + removables.push(i); + } + } + for removable in removables { + evs.remove(removable); + } } } - for removable in removables { - evs.remove(removable); - } + + received_events.push(actual); } if evs.is_empty() { break; } } assert!(evs.is_empty(), "Some expected events did not occur before the test timedout:\n\t\t{:?}", evs); + + received_events +} + +#[cfg(target_os = "windows")] +// Windows needs to test this differently since it can't watch files that don't exist yet. +fn validate_watch_single_file(ctor: F) where + F: Fn(Sender) -> Result, W: Watcher { + + let (tx, rx) = channel(); + let mut w = ctor(tx).unwrap(); + + // While the file is open, windows won't report modified events for it. + // Flushing doesn't help. So make sure it is closed before we validate. + let path = { + let mut file = NamedTempFile::new().unwrap(); + w.watch(file.path()).unwrap(); + thread::sleep_ms(1000); // give watcher enough time to spin up + + // make some files that should be exlcuded from watch. this works because tempfile creates + // them all in the same directory. + let mut excluded_file = NamedTempFile::new().unwrap(); + let another_excluded_file = NamedTempFile::new().unwrap(); + let _ = another_excluded_file; // eliminate warning + excluded_file.write_all(b"shouldn't get an event for this").unwrap(); + + file.write_all(b"foo").unwrap(); + file.flush().unwrap(); + file.path().to_path_buf() + }; + let events = validate_recv(rx, vec![(path.as_path(), op::WRITE), + (path.as_path(), op::REMOVE)]); + + // make sure that ONLY the target path is in the list of received events + for evt in events { + assert!(evt.path.unwrap() == path.as_path()); + } } +#[cfg(not(target_os = "windows"))] fn validate_watch_single_file(ctor: F) where F: Fn(Sender) -> Result, W: Watcher { let mut file = NamedTempFile::new().unwrap(); @@ -103,6 +149,7 @@ fn validate_watch_dir(ctor: F) where let mut w = ctor(tx).unwrap(); w.watch(dir.path()).unwrap(); + let f111 = NamedTempFile::new_in(dir11.path()).unwrap(); let f111_path = f111.path().to_owned(); let f111_path = f111_path.as_path(); diff --git a/tests/windows.rs b/tests/windows.rs new file mode 100644 index 00000000..f1ad84e8 --- /dev/null +++ b/tests/windows.rs @@ -0,0 +1,138 @@ +extern crate notify; +extern crate tempdir; +extern crate tempfile; +extern crate time; + +use notify::*; +use std::thread; +use std::sync::mpsc::{channel, Receiver}; +use tempdir::TempDir; + +fn check_for_error(rx:&Receiver) { + while let Ok(res) = rx.try_recv() { + match res.op { + Err(e) => panic!("unexpected err: {:?}: {:?}", e, res.path), + _ => () + } + }; +} +#[cfg(target_os="windows")] +#[test] +fn shutdown() { + // create a watcher for n directories. start the watcher, then shut it down. inspect + // the watcher to make sure that it received final callbacks for all n watchers. + let dir_count = 100; + + // to get meta events, we have to pass in the meta channel + let (meta_tx,meta_rx) = channel(); + let (tx, rx) = channel(); + { + let mut dirs:Vec = Vec::new(); + let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap(); + + for _ in 0..dir_count { + let d = TempDir::new("rsnotifytest").unwrap(); + dirs.push(d); + } + + for d in &dirs { // need the ref, otherwise its a move and the dir will be dropped! + //println!("{:?}", d.path()); + w.watch(d.path()).unwrap(); + } + + // unwatch half of the directories, let the others get stopped when we go out of scope + for d in &dirs[0..dir_count/2] { + w.unwatch(d.path()).unwrap(); + } + + thread::sleep_ms(2000); // sleep to unhook the watches + } + + check_for_error(&rx); + + const TIMEOUT_S: f64 = 60.0; // give it PLENTY of time before we declare failure + let deadline = time::precise_time_s() + TIMEOUT_S; + let mut watchers_shutdown = 0; + while watchers_shutdown != dir_count && time::precise_time_s() < deadline { + if let Ok(actual) = meta_rx.try_recv() { + match actual { + notify::windows::MetaEvent::SingleWatchComplete => watchers_shutdown += 1, + _ => () + } + } + thread::sleep_ms(50); // don't burn cpu, can take some time for completion events to fire + } + + assert_eq!(watchers_shutdown,dir_count); +} + +#[cfg(target_os="windows")] +#[test] +fn watch_deleted_fails() { + let pb = { + let d = TempDir::new("rsnotifytest").unwrap(); + d.path().to_path_buf() + }; + + let (tx, _) = channel(); + let mut w = ReadDirectoryChangesWatcher::new(tx).unwrap(); + match w.watch(pb.as_path()) { + Ok(x) => panic!("Should have failed, but got: {:?}", x), + Err(_) => () + } +} + +#[cfg(target_os="windows")] +#[test] +fn watch_server_can_be_awakened() { + let (tx, _) = channel(); + let (meta_tx,meta_rx) = channel(); + let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap(); + let d = TempDir::new("rsnotifytest").unwrap(); + let d2 = TempDir::new("rsnotifytest").unwrap(); + + match w.watch(d.path()) { + Ok(_) => (), + Err(e) => panic!("Oops: {:?}", e) + } + match w.watch(d2.path()) { + Ok(_) => (), + Err(e) => panic!("Oops: {:?}", e) + } + // should be at least one awaken in there + const TIMEOUT_S: f64 = 5.0; + let deadline = time::precise_time_s() + TIMEOUT_S; + let mut awakened = false; + while time::precise_time_s() < deadline { + if let Ok(actual) = meta_rx.try_recv() { + match actual { + notify::windows::MetaEvent::WatcherAwakened => awakened = true, + _ => () + } + } + thread::sleep_ms(50); + } + + if !awakened { + panic!("Failed to awaken"); + } +} + +#[cfg(target_os="windows")] +#[test] +#[ignore] +// repeatedly watch and unwatch a directory; make sure process memory does not increase. +// you use task manager to watch the memory; it will fluctuate a bit, but should not leak overall +fn memtest_manual() { + loop { + let (tx, rx) = channel(); + let d = TempDir::new("rsnotifytest").unwrap(); + { + let (meta_tx,_) = channel(); + let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap(); + w.watch(d.path()).unwrap(); + thread::sleep_ms(1); // this should make us run pretty hot but not insane + } + check_for_error(&rx); + } +}