From f24f3fe2f93f91bd84283477ebe8a756dff6a70c Mon Sep 17 00:00:00 2001 From: Michael Maurizi Date: Sun, 7 Jun 2015 16:14:56 -0400 Subject: [PATCH 01/24] WIP --- Cargo.toml | 8 +- src/lib.rs | 6 +- src/windows.rs | 228 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 240 insertions(+), 2 deletions(-) create mode 100644 src/windows.rs diff --git a/Cargo.toml b/Cargo.toml index b3bd6dcb..e4ed6b0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ authors = [ "Félix Saparelli ", "Antti Keränen ", "Jorge Israel Peña ", - "Michael Maurizi ", + "Michael Maurizi ", "Pierre Baillet ", "ShuYu Wang ", ] @@ -35,3 +35,9 @@ version = "^0.1" [target.x86_64-apple-darwin.dependencies.fsevent-sys] version = "^0.1" + +[target.i686-pc-windows-gnu] +dependencies = { winapi = "0.1", kernel32-sys = "0.1" } + +[target.x86_64-pc-windows-gnu] +dependencies = { winapi = "0.1", kernel32-sys = "0.1" } diff --git a/src/lib.rs b/src/lib.rs index 22f0abe5..15a587c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate log; #[macro_use] extern crate bitflags; #[cfg(target_os="macos")] extern crate fsevent_sys; +#[cfg(target_os="windows")] extern crate winapi; extern crate libc; pub use self::op::Op; @@ -11,10 +12,12 @@ use std::sync::mpsc::Sender; #[cfg(target_os="macos")] pub use self::fsevent::FsEventWatcher; #[cfg(target_os="linux")] pub use self::inotify::INotifyWatcher; +#[cfg(target_os="windows")] pub use self::windows::ReadDirectoryChangesWatcher; pub use self::null::NullWatcher; #[cfg(target_os="linux")] pub mod inotify; #[cfg(target_os="macos")] pub mod fsevent; +#[cfg(target_os="windows")] pub mod windows; pub mod null; pub mod op { @@ -53,7 +56,8 @@ pub trait Watcher { #[cfg(target_os = "linux")] pub type RecommendedWatcher = INotifyWatcher; #[cfg(target_os = "macos")] pub type RecommendedWatcher = FsEventWatcher; -#[cfg(not(any(target_os = "linux", target_os = "macos")))] pub type RecommendedWatcher = NullWatcher; +#[cfg(target_os = "windows")] pub type RecommendedWatcher = ReadDirectoryChangesWatcher; +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] pub type RecommendedWatcher = NullWatcher; pub fn new(tx: Sender) -> Result { Watcher::new(tx) diff --git a/src/windows.rs b/src/windows.rs new file mode 100644 index 00000000..8475160e --- /dev/null +++ b/src/windows.rs @@ -0,0 +1,228 @@ +extern crate libc; +extern crate kernel32; +extern crate winapi; + +use libc::c_void; + +use winapi::{HANDLE, INVALID_HANDLE_VALUE, fileapi, winbase, winnt}; +use winapi::minwinbase::{OVERLAPPED, LPOVERLAPPED}; +use winapi::winerror::ERROR_OPERATION_ABORTED; +use winapi::winnt::FILE_NOTIFY_INFORMATION; + +use std::collections::HashMap; +use std::mem; +use std::path::{Path, PathBuf}; +use std::ptr; +use std::slice; +use std::sync::mpsc::{channel, Sender, Receiver}; +use std::ffi::OsString; +use std::os::windows::ffi::{OsStrExt, OsStringExt}; +use std::thread; + +use super::{Event, Error, op, Op, Watcher}; + +const BUF_SIZE: u32 = 16384; + +struct ReadDirectoryRequest { + tx: Sender, + buffer: [u8; BUF_SIZE as usize], + handle: HANDLE +} + +enum Action { + Watch(PathBuf), + Unwatch(PathBuf), + Stop +} + +struct ReadDirectoryChangesServer { + rx: Receiver, + tx: Sender, + watches: HashMap, +} + +impl ReadDirectoryChangesServer { + fn start(event_tx: Sender) -> Sender { + let (action_tx, action_rx) = channel(); + thread::spawn(move || { + let server = ReadDirectoryChangesServer { + tx: event_tx, + rx: action_rx, + watches: HashMap::new() + }; + server.run(); + }); + action_tx + } + + fn run(mut self) { + while let Ok(action) = self.rx.recv() { + match action { + Action::Watch(path) => self.add_watch(path), + Action::Unwatch(path) => self.remove_watch(path), + Action::Stop => { + for (_, handle) in self.watches { + unsafe { + close_handle(handle); + } + } + break + } + } + } + } + + fn add_watch(&mut self, path: PathBuf) { + let encoded_path: Vec = path.as_os_str().encode_wide().chain(Some(0)).collect(); + let handle; + unsafe { + handle = kernel32::CreateFileW( + encoded_path.as_ptr(), + winnt::FILE_LIST_DIRECTORY, + winnt::FILE_SHARE_READ | winnt::FILE_SHARE_DELETE, + ptr::null_mut(), + fileapi::OPEN_EXISTING, + winbase::FILE_FLAG_BACKUP_SEMANTICS | winbase::FILE_FLAG_OVERLAPPED, + ptr::null_mut()); + + if handle == INVALID_HANDLE_VALUE { + let _ = self.tx.send(Event { + path: None, + // TODO: Call GetLastError for better error info? + op: Err(Error::PathNotFound) + }); + return; + } + } + self.watches.insert(path, handle); + start_read(&self.tx, handle); + } + + fn remove_watch(&mut self, path: PathBuf) { + if let Some(handle) = self.watches.remove(&path) { + unsafe { + close_handle(handle); + } + } + } +} + +unsafe fn close_handle(handle: HANDLE) { + // TODO: Handle errors + kernel32::CancelIo(handle); + kernel32::CloseHandle(handle); +} + +fn start_read(tx: &Sender, handle: HANDLE) { + let mut request = Box::new(ReadDirectoryRequest { + tx: tx.clone(), + handle: handle, + buffer: [0u8; BUF_SIZE as usize] + }); + + let flags = winnt::FILE_NOTIFY_CHANGE_FILE_NAME + | winnt::FILE_NOTIFY_CHANGE_DIR_NAME + | winnt::FILE_NOTIFY_CHANGE_ATTRIBUTES + | winnt::FILE_NOTIFY_CHANGE_SIZE + | winnt::FILE_NOTIFY_CHANGE_LAST_WRITE + | winnt::FILE_NOTIFY_CHANGE_CREATION + | winnt::FILE_NOTIFY_CHANGE_SECURITY; + + let request_p = &mut request as *mut _ as *mut c_void; + + unsafe { + let mut overlapped: Box = Box::new(mem::zeroed()); + // When using callback based async requests, we are allowed to use the hEvent member + // for our own purposes + overlapped.hEvent = request_p; + + // This is using an asynchronous call with a completion routine for receiving notifications + // An I/O completion port would probably be more performant + kernel32::ReadDirectoryChangesW( + handle, + request.buffer.as_mut_ptr() as *mut c_void, + BUF_SIZE, + 1, // We do want to monitor subdirectories + flags, + &mut 0u32 as *mut u32, // This parameter is not used for async requests + &mut *overlapped as *mut OVERLAPPED, + Some(handle_event)); + + mem::forget(overlapped); + mem::forget(request); + } +} + +unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED) { + // TODO: Use Box::from_raw when it is no longer unstable + let overlapped: Box = mem::transmute(overlapped); + let request: Box = mem::transmute(overlapped.hEvent); + + if error_code == ERROR_OPERATION_ABORTED { + // We receive this error when the directory for this request is unwatched? + return; + } + + // Get the next request queued up as soon as possible + start_read(&request.tx, request.handle); + + // The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length string + // as its last member. Each struct contains an offset for getting the next entry in the buffer + let mut cur_offset: *const u8 = request.buffer.as_ptr(); + let mut cur_entry: *const FILE_NOTIFY_INFORMATION = mem::transmute(cur_offset); + loop { + let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), (*cur_entry).FileNameLength as usize); + let path = PathBuf::from(OsString::from_wide(encoded_path)); + + let op = match (*cur_entry).Action { + winnt::FILE_ACTION_ADDED => op::CREATE, + winnt::FILE_ACTION_REMOVED => op::REMOVE, + winnt::FILE_ACTION_MODIFIED => op::WRITE, + winnt::FILE_ACTION_RENAMED_OLD_NAME | winnt::FILE_ACTION_RENAMED_NEW_NAME => op::RENAME, + _ => Op::empty() + }; + + let _ = request.tx.send(Event { + path: Some(path), + op: Ok(op) + }); + + if (*cur_entry).NextEntryOffset == 0 { + break; + } + cur_offset = cur_offset.offset((*cur_entry).NextEntryOffset as isize); + cur_entry = mem::transmute(cur_offset); + } +} + +pub struct ReadDirectoryChangesWatcher { + tx: Sender +} + +impl Watcher for ReadDirectoryChangesWatcher { + fn new(event_tx: Sender) -> Result { + let action_tx = ReadDirectoryChangesServer::start(event_tx); + + return Ok(ReadDirectoryChangesWatcher { + tx: action_tx + }); + } + + fn watch(&mut self, path: &Path) -> Result<(), Error> { + // TODO: Add SendError to notify::Error and use try!(...)? + self.tx.send(Action::Watch(path.to_path_buf())); + Ok(()) + } + + fn unwatch(&mut self, path: &Path) -> Result<(), Error> { + // TODO: Add SendError to notify::Error and use try!(...)? + self.tx.send(Action::Unwatch(path.to_path_buf())); + Ok(()) + } +} + +impl Drop for ReadDirectoryChangesWatcher { + fn drop(&mut self) { + let _ = self.tx.send(Action::Stop); + } +} From 79b878515c0eb70587ef19106b7995f4706b4a92 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Tue, 17 Nov 2015 00:20:36 -0500 Subject: [PATCH 02/24] Windows: use SleepEx to allow completion routine to fire. Fix bad transmute on request pointer. - This switches the main loop to alternate between waiting for action events and blocking on SleepEx. The SleepEx block allows completion events to fire. - This also fixes a failed transmute in the completion callback. Basically instead of using the request_p casted to &mut c_void, we need to transmute it and pass that into ReadDirectoryChangesW. Otherwise, when the completion routine attempts to transmute it back, it will be crap. - Still need to fix some todos, so this is still WIP. --- Cargo.toml | 7 +++-- src/lib.rs | 3 ++ src/windows.rs | 81 +++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 72 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e4ed6b0d..0a7b0f16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,10 @@ version = "^0.1" version = "^0.1" [target.i686-pc-windows-gnu] -dependencies = { winapi = "0.1", kernel32-sys = "0.1" } +dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" } [target.x86_64-pc-windows-gnu] -dependencies = { winapi = "0.1", kernel32-sys = "0.1" } +dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" } + +[target.x86_64-pc-windows-msvc] +dependencies = { winapi = "0.2", kernel32-sys = "0.2.1" } diff --git a/src/lib.rs b/src/lib.rs index 15a587c3..89696534 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(mpsc_select)] + #[macro_use] extern crate log; #[macro_use] extern crate bitflags; #[cfg(target_os="macos")] extern crate fsevent_sys; @@ -32,6 +34,7 @@ pub mod op { } } +#[derive(Debug)] pub struct Event { pub path: Option, pub op: Result, diff --git a/src/windows.rs b/src/windows.rs index 8475160e..33d5a1d1 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -2,8 +2,6 @@ extern crate libc; extern crate kernel32; extern crate winapi; -use libc::c_void; - use winapi::{HANDLE, INVALID_HANDLE_VALUE, fileapi, winbase, winnt}; use winapi::minwinbase::{OVERLAPPED, LPOVERLAPPED}; use winapi::winerror::ERROR_OPERATION_ABORTED; @@ -14,10 +12,11 @@ use std::mem; use std::path::{Path, PathBuf}; use std::ptr; use std::slice; -use std::sync::mpsc::{channel, Sender, Receiver}; +use std::sync::mpsc::{channel, Sender, Receiver, RecvError}; use std::ffi::OsString; use std::os::windows::ffi::{OsStrExt, OsStringExt}; use std::thread; +use std::os::raw::c_void; use super::{Event, Error, op, Op, Watcher}; @@ -35,12 +34,27 @@ enum Action { Stop } +enum SelectResult { + Timeout, + Action(Action), + Error(RecvError) +} + struct ReadDirectoryChangesServer { rx: Receiver, tx: Sender, watches: HashMap, } +fn oneshot_timer(ms: u32) -> Receiver<()> { + let (tx, rx) = channel(); + thread::spawn(move || { + thread::sleep_ms(ms); + tx.send(()); + }); + rx +} + impl ReadDirectoryChangesServer { fn start(event_tx: Sender) -> Sender { let (action_tx, action_rx) = channel(); @@ -56,19 +70,48 @@ impl ReadDirectoryChangesServer { } fn run(mut self) { - while let Ok(action) = self.rx.recv() { - match action { - Action::Watch(path) => self.add_watch(path), - Action::Unwatch(path) => self.remove_watch(path), - Action::Stop => { - for (_, handle) in self.watches { - unsafe { - close_handle(handle); + loop { + let timeout = oneshot_timer(500); // TODO: dumb, spawns a thread every time + + // have to pull the result out of the select! this way, because it + // can't parse self.rx.recv() in its arguments. + let res = { + let rx = &self.rx; + + select! ( + _ = timeout.recv() => SelectResult::Timeout, // TODO: timeout error? + action_res = rx.recv() => { + match action_res { + Err(e) => SelectResult::Error(e), + Ok(action) => SelectResult::Action(action) + } + } + ) + }; + + match res { + SelectResult::Timeout => (), + SelectResult::Error(e) => panic!("watcher error: {:?}", e), // TODO: what to do? + SelectResult::Action(action) => { + match action { + Action::Watch(path) => self.add_watch(path), + Action::Unwatch(path) => self.remove_watch(path), + Action::Stop => { + for (_, handle) in self.watches { + unsafe { + close_handle(handle); + } + } + break } } - break } } + + // call sleepex with alertable flag so that our completion routine fires + unsafe { + kernel32::SleepEx(500, 1); + } } } @@ -128,19 +171,20 @@ fn start_read(tx: &Sender, handle: HANDLE) { | winnt::FILE_NOTIFY_CHANGE_CREATION | winnt::FILE_NOTIFY_CHANGE_SECURITY; - let request_p = &mut request as *mut _ as *mut c_void; - unsafe { let mut overlapped: Box = Box::new(mem::zeroed()); // When using callback based async requests, we are allowed to use the hEvent member // for our own purposes + + let req_buf = request.buffer.as_mut_ptr() as *mut c_void; + let request_p: *mut c_void = mem::transmute(request); overlapped.hEvent = request_p; // This is using an asynchronous call with a completion routine for receiving notifications // An I/O completion port would probably be more performant kernel32::ReadDirectoryChangesW( handle, - request.buffer.as_mut_ptr() as *mut c_void, + req_buf, BUF_SIZE, 1, // We do want to monitor subdirectories flags, @@ -149,7 +193,6 @@ fn start_read(tx: &Sender, handle: HANDLE) { Some(handle_event)); mem::forget(overlapped); - mem::forget(request); } } @@ -171,7 +214,9 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove let mut cur_offset: *const u8 = request.buffer.as_ptr(); let mut cur_entry: *const FILE_NOTIFY_INFORMATION = mem::transmute(cur_offset); loop { - let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), (*cur_entry).FileNameLength as usize); + // filename length is size in bytes, so / 2 + let len = (*cur_entry).FileNameLength as usize / 2; + let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), len); let path = PathBuf::from(OsString::from_wide(encoded_path)); let op = match (*cur_entry).Action { @@ -193,6 +238,8 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove cur_offset = cur_offset.offset((*cur_entry).NextEntryOffset as isize); cur_entry = mem::transmute(cur_offset); } + + // TODO: need to free request and overlapped } pub struct ReadDirectoryChangesWatcher { From 1592724423cbcd3e8fa0a32cf37ef86d61fa0b38 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Tue, 17 Nov 2015 15:12:55 -0500 Subject: [PATCH 03/24] Windows: use full paths in reported events --- src/windows.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index 56c8a587..6d87424a 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -25,7 +25,8 @@ const BUF_SIZE: u32 = 16384; struct ReadDirectoryRequest { tx: Sender, buffer: [u8; BUF_SIZE as usize], - handle: HANDLE + handle: HANDLE, + root: PathBuf } enum Action { @@ -137,8 +138,8 @@ impl ReadDirectoryChangesServer { return; } } - self.watches.insert(path, handle); - start_read(&self.tx, handle); + self.watches.insert(path.clone(), handle); + start_read(&path, &self.tx, handle); } fn remove_watch(&mut self, path: PathBuf) { @@ -156,11 +157,12 @@ unsafe fn close_handle(handle: HANDLE) { kernel32::CloseHandle(handle); } -fn start_read(tx: &Sender, handle: HANDLE) { +fn start_read(path: &PathBuf, tx: &Sender, handle: HANDLE) { let mut request = Box::new(ReadDirectoryRequest { tx: tx.clone(), handle: handle, - buffer: [0u8; BUF_SIZE as usize] + buffer: [0u8; BUF_SIZE as usize], + root: path.clone() }); let flags = winnt::FILE_NOTIFY_CHANGE_FILE_NAME @@ -207,7 +209,7 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove } // Get the next request queued up as soon as possible - start_read(&request.tx, request.handle); + start_read(&request.root, &request.tx, request.handle); // The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length string // as its last member. Each struct contains an offset for getting the next entry in the buffer @@ -217,7 +219,8 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove // filename length is size in bytes, so / 2 let len = (*cur_entry).FileNameLength as usize / 2; let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), len); - let path = PathBuf::from(OsString::from_wide(encoded_path)); + // prepend root to get a full path + let path = request.root.join(PathBuf::from(OsString::from_wide(encoded_path))); let op = match (*cur_entry).Action { winnt::FILE_ACTION_ADDED => op::CREATE, @@ -227,10 +230,12 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove _ => Op::empty() }; - let _ = request.tx.send(Event { + + let evt = Event { path: Some(path), op: Ok(op) - }); + }; + request.tx.send(evt).ok().expect("error while sending event"); if (*cur_entry).NextEntryOffset == 0 { break; From 124652c1e2205b8155cbde3f452627fa67ed1c27 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Tue, 17 Nov 2015 18:08:46 -0500 Subject: [PATCH 04/24] Windows: use try_recv instead of select!, process all actions in loop --- src/lib.rs | 2 -- src/windows.rs | 67 ++++++++++++++------------------------------------ 2 files changed, 19 insertions(+), 50 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 79aed46d..7fd30605 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,3 @@ -#![feature(mpsc_select)] - #[macro_use] extern crate log; #[macro_use] extern crate bitflags; #[cfg(target_os="macos")] extern crate fsevent_sys; diff --git a/src/windows.rs b/src/windows.rs index 6d87424a..62cc1632 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -12,7 +12,7 @@ use std::mem; use std::path::{Path, PathBuf}; use std::ptr; use std::slice; -use std::sync::mpsc::{channel, Sender, Receiver, RecvError}; +use std::sync::mpsc::{channel, Sender, Receiver}; use std::ffi::OsString; use std::os::windows::ffi::{OsStrExt, OsStringExt}; use std::thread; @@ -35,25 +35,10 @@ enum Action { Stop } -enum SelectResult { - Timeout, - Action(Action), - Error(RecvError) -} - struct ReadDirectoryChangesServer { rx: Receiver, tx: Sender, - watches: HashMap, -} - -fn oneshot_timer(ms: u32) -> Receiver<()> { - let (tx, rx) = channel(); - thread::spawn(move || { - thread::sleep_ms(ms); - tx.send(()); - }); - rx + watches: HashMap } impl ReadDirectoryChangesServer { @@ -72,41 +57,27 @@ impl ReadDirectoryChangesServer { fn run(mut self) { loop { - let timeout = oneshot_timer(500); // TODO: dumb, spawns a thread every time - - // have to pull the result out of the select! this way, because it - // can't parse self.rx.recv() in its arguments. - let res = { - let rx = &self.rx; - - select! ( - _ = timeout.recv() => SelectResult::Timeout, // TODO: timeout error? - action_res = rx.recv() => { - match action_res { - Err(e) => SelectResult::Error(e), - Ok(action) => SelectResult::Action(action) - } - } - ) - }; - - match res { - SelectResult::Timeout => (), - SelectResult::Error(e) => panic!("watcher error: {:?}", e), // TODO: what to do? - SelectResult::Action(action) => { - match action { - Action::Watch(path) => self.add_watch(path), - Action::Unwatch(path) => self.remove_watch(path), - Action::Stop => { - for (_, handle) in self.watches { - unsafe { - close_handle(handle); - } + // process all available actions first + let mut stopped = false; + + while let Ok(action) = self.rx.try_recv() { + match action { + Action::Watch(path) => self.add_watch(path), + Action::Unwatch(path) => self.remove_watch(path), + Action::Stop => { + stopped = true; + for (_, handle) in &self.watches { + unsafe { + close_handle(*handle); } - break } + break; } } + }; + + if stopped { + break; } // call sleepex with alertable flag so that our completion routine fires From be214c854523e728588c4e98fdc70142018f46dc Mon Sep 17 00:00:00 2001 From: John Quigley Date: Tue, 17 Nov 2015 18:09:41 -0500 Subject: [PATCH 05/24] Windows: fix watch_dir_recommended test by not panicking if channel is closed - Most of the other watchers don't panic on send failure --- src/windows.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/windows.rs b/src/windows.rs index 62cc1632..b13708d4 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -206,7 +206,7 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove path: Some(path), op: Ok(op) }; - request.tx.send(evt).ok().expect("error while sending event"); + let _ = request.tx.send(evt); if (*cur_entry).NextEntryOffset == 0 { break; From 1764e373e5405dc0a1b2eb31870c0a802494d4b5 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Tue, 17 Nov 2015 18:25:00 -0500 Subject: [PATCH 06/24] Windows: don't need to free overlapped/request in handler - Transmute converts them back into a droppable resource --- src/windows.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index b13708d4..e028c552 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -214,8 +214,6 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove cur_offset = cur_offset.offset((*cur_entry).NextEntryOffset as isize); cur_entry = mem::transmute(cur_offset); } - - // TODO: need to free request and overlapped } pub struct ReadDirectoryChangesWatcher { From 8d2167fb01aaccec695cf9a9d1719ce1640925df Mon Sep 17 00:00:00 2001 From: John Quigley Date: Tue, 17 Nov 2015 18:37:23 -0500 Subject: [PATCH 07/24] Windows: convert watch/unwatch send errors to Error::Generic - If these occur they are internal errors, so Generic seems sufficient --- src/windows.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index e028c552..19931a58 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -230,15 +230,13 @@ impl Watcher for ReadDirectoryChangesWatcher { } fn watch>(&mut self, path: P) -> Result<(), Error> { - // TODO: Add SendError to notify::Error and use try!(...)? - self.tx.send(Action::Watch(path.as_ref().to_path_buf())); - Ok(()) + self.tx.send(Action::Watch(path.as_ref().to_path_buf())) + .map_err(|_| Error::Generic("Error sending to internal channel".to_owned())) } fn unwatch>(&mut self, path: P) -> Result<(), Error> { - // TODO: Add SendError to notify::Error and use try!(...)? - self.tx.send(Action::Unwatch(path.as_ref().to_path_buf())); - Ok(()) + self.tx.send(Action::Unwatch(path.as_ref().to_path_buf())) + .map_err(|_| Error::Generic("Error sending to internal channel".to_owned())) } } From 12f0ae4b31f0496d15b3fd509091b0f4b3791f85 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Tue, 17 Nov 2015 20:35:00 -0500 Subject: [PATCH 08/24] Windows: change test to give windows a little time to start watcher before creating files --- tests/notify.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/notify.rs b/tests/notify.rs index e46d928b..1489058d 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -103,6 +103,11 @@ fn validate_watch_dir(ctor: F) where let mut w = ctor(tx).unwrap(); w.watch(dir.path()).unwrap(); + // Windows needs some time for thread spinup before we start creating files. + if cfg!(target_os = "windows") { + thread::sleep_ms(5); + } + let f111 = NamedTempFile::new_in(dir11.path()).unwrap(); let f111_path = f111.path().to_owned(); let f111_path = f111_path.as_path(); From 108e08cc88292484cc3c604203c24cc465515ce4 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Tue, 17 Nov 2015 21:00:23 -0500 Subject: [PATCH 09/24] Windows: allow write on the monitored directory - Otherwise files can't be renamed/deleted! --- src/windows.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/windows.rs b/src/windows.rs index 19931a58..40909628 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -94,7 +94,7 @@ impl ReadDirectoryChangesServer { handle = kernel32::CreateFileW( encoded_path.as_ptr(), winnt::FILE_LIST_DIRECTORY, - winnt::FILE_SHARE_READ | winnt::FILE_SHARE_DELETE, + winnt::FILE_SHARE_READ | winnt::FILE_SHARE_DELETE | winnt::FILE_SHARE_WRITE, ptr::null_mut(), fileapi::OPEN_EXISTING, winbase::FILE_FLAG_BACKUP_SEMANTICS | winbase::FILE_FLAG_OVERLAPPED, From 549da9ff4f94c435a6cd90a9762013953471a30b Mon Sep 17 00:00:00 2001 From: John Quigley Date: Wed, 18 Nov 2015 12:32:10 -0500 Subject: [PATCH 10/24] Windows: implement single file watch - The file or directory must exist when watch is called, since we use its type to determine how to watch it. - File watches are essentially emulated by watching the parent directory and excluding changes to anything but the target file. This allows reuse of the ReadDirectoryChangesW API. --- src/windows.rs | 103 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 81 insertions(+), 22 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index 40909628..dd3796de 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -22,11 +22,17 @@ use super::{Event, Error, op, Op, Watcher}; const BUF_SIZE: u32 = 16384; +#[derive(Debug,Clone)] +struct ReadData { + dir: PathBuf, // directory that is being watched + file: Option, // if a file is being watched, this is its full path +} + struct ReadDirectoryRequest { tx: Sender, buffer: [u8; BUF_SIZE as usize], handle: HANDLE, - root: PathBuf + data: ReadData } enum Action { @@ -88,7 +94,25 @@ impl ReadDirectoryChangesServer { } fn add_watch(&mut self, path: PathBuf) { - let encoded_path: Vec = path.as_os_str().encode_wide().chain(Some(0)).collect(); + // path must exist and be either a file or directory + if !path.is_dir() && !path.is_file() { + let _ = self.tx.send(Event { + path: Some(path), + op: Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned())) + }); + return; + } + + let (watching_file,dir_target) = { + if path.is_dir() { + (false,path.clone()) + } else { + // emulate file watching by watching the parent directory + (true,path.parent().unwrap().to_path_buf()) + } + }; + + let encoded_path: Vec = dir_target.as_os_str().encode_wide().chain(Some(0)).collect(); let handle; unsafe { handle = kernel32::CreateFileW( @@ -101,16 +125,30 @@ impl ReadDirectoryChangesServer { ptr::null_mut()); if handle == INVALID_HANDLE_VALUE { + let err = if watching_file { + Err(Error::Generic("You attempted to watch a single file, but parent directory could not be opened.".to_owned())) + } else { + // TODO: Call GetLastError for better error info? + Err(Error::PathNotFound) + }; let _ = self.tx.send(Event { path: None, - // TODO: Call GetLastError for better error info? - op: Err(Error::PathNotFound) + op: err }); return; } } + let wf = if watching_file { + Some(path.clone()) + } else { + None + }; + let rd = ReadData { + dir: dir_target, + file: wf + }; self.watches.insert(path.clone(), handle); - start_read(&path, &self.tx, handle); + start_read(&rd, &self.tx, handle); } fn remove_watch(&mut self, path: PathBuf) { @@ -128,12 +166,12 @@ unsafe fn close_handle(handle: HANDLE) { kernel32::CloseHandle(handle); } -fn start_read(path: &PathBuf, tx: &Sender, handle: HANDLE) { +fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { let mut request = Box::new(ReadDirectoryRequest { tx: tx.clone(), handle: handle, buffer: [0u8; BUF_SIZE as usize], - root: path.clone() + data: rd.clone() }); let flags = winnt::FILE_NOTIFY_CHANGE_FILE_NAME @@ -144,6 +182,12 @@ fn start_read(path: &PathBuf, tx: &Sender, handle: HANDLE) { | winnt::FILE_NOTIFY_CHANGE_CREATION | winnt::FILE_NOTIFY_CHANGE_SECURITY; + let monitor_subdir = if (&request.data.file).is_none() { + 1 + } else { + 0 + }; + unsafe { let mut overlapped: Box = Box::new(mem::zeroed()); // When using callback based async requests, we are allowed to use the hEvent member @@ -159,7 +203,7 @@ fn start_read(path: &PathBuf, tx: &Sender, handle: HANDLE) { handle, req_buf, BUF_SIZE, - 1, // We do want to monitor subdirectories + monitor_subdir, flags, &mut 0u32 as *mut u32, // This parameter is not used for async requests &mut *overlapped as *mut OVERLAPPED, @@ -180,7 +224,7 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove } // Get the next request queued up as soon as possible - start_read(&request.root, &request.tx, request.handle); + start_read(&request.data, &request.tx, request.handle); // The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length string // as its last member. Each struct contains an offset for getting the next entry in the buffer @@ -191,22 +235,31 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove let len = (*cur_entry).FileNameLength as usize / 2; let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), len); // prepend root to get a full path - let path = request.root.join(PathBuf::from(OsString::from_wide(encoded_path))); - - let op = match (*cur_entry).Action { - winnt::FILE_ACTION_ADDED => op::CREATE, - winnt::FILE_ACTION_REMOVED => op::REMOVE, - winnt::FILE_ACTION_MODIFIED => op::WRITE, - winnt::FILE_ACTION_RENAMED_OLD_NAME | winnt::FILE_ACTION_RENAMED_NEW_NAME => op::RENAME, - _ => Op::empty() + let path = request.data.dir.join(PathBuf::from(OsString::from_wide(encoded_path))); + + // if we are watching a single file, ignore the event unless the path is exactly + // the watched file + let skip = match request.data.file { + None => false, + Some(ref watch_path) => *watch_path != path }; + if !skip { + let op = match (*cur_entry).Action { + winnt::FILE_ACTION_ADDED => op::CREATE, + winnt::FILE_ACTION_REMOVED => op::REMOVE, + winnt::FILE_ACTION_MODIFIED => op::WRITE, + winnt::FILE_ACTION_RENAMED_OLD_NAME | winnt::FILE_ACTION_RENAMED_NEW_NAME => op::RENAME, + _ => Op::empty() + }; - let evt = Event { - path: Some(path), - op: Ok(op) - }; - let _ = request.tx.send(evt); + + let evt = Event { + path: Some(path), + op: Ok(op) + }; + let _ = request.tx.send(evt); + } if (*cur_entry).NextEntryOffset == 0 { break; @@ -230,6 +283,12 @@ impl Watcher for ReadDirectoryChangesWatcher { } fn watch>(&mut self, path: P) -> Result<(), Error> { + // path must exist and be either a file or directory + let pb = path.as_ref().to_path_buf(); + if !pb.is_dir() && !pb.is_file() { + return Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned())); + } + self.tx.send(Action::Watch(path.as_ref().to_path_buf())) .map_err(|_| Error::Generic("Error sending to internal channel".to_owned())) } From 7e3ead613ffa811a15218e30805b9a84ee636116 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Wed, 18 Nov 2015 12:33:50 -0500 Subject: [PATCH 11/24] Windows: implement test for single file watcher - Windows needs a special variant of this test to deal with its limitations. - This also modifies validate_recv to return the event list, so that the calling test can check more properties --- tests/notify.rs | 65 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/tests/notify.rs b/tests/notify.rs index 1489058d..9e807df5 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -53,31 +53,76 @@ fn resolve_path(path: &Path) -> PathBuf { PathBuf::from(p) } -fn validate_recv(rx: Receiver, evs: Vec<(&Path, Op)>) { +fn validate_recv(rx: Receiver, evs: Vec<(&Path, Op)>) -> Vec { let deadline = time::precise_time_s() + TIMEOUT_S; let mut evs = evs.clone(); + let mut received_events:Vec = Vec::new(); + while time::precise_time_s() < deadline { if let Ok(actual) = rx.try_recv() { let path = actual.path.clone().unwrap(); - let op = actual.op.unwrap().clone(); - let mut removables = vec!(); - for i in (0..evs.len()) { - let expected = evs.get(i).unwrap(); - if path.clone().as_path() == expected.0 && op.contains(expected.1) { - removables.push(i); + match actual.op { + Err(e) => panic!("unexpected err: {:?}", e), + Ok(op) => { + let mut removables = vec!(); + for i in (0..evs.len()) { + let expected = evs.get(i).unwrap(); + if path.clone().as_path() == expected.0 && op.contains(expected.1) { + removables.push(i); + } + } + for removable in removables { + evs.remove(removable); + } } } - for removable in removables { - evs.remove(removable); - } + + received_events.push(actual); } if evs.is_empty() { break; } } assert!(evs.is_empty(), "Some expected events did not occur before the test timedout:\n\t\t{:?}", evs); + + received_events +} + +#[cfg(target_os = "windows")] +// Windows needs to test this differently since it can't watch files that don't exist yet. +fn validate_watch_single_file(ctor: F) where + F: Fn(Sender) -> Result, W: Watcher { + + let (tx, rx) = channel(); + let mut w = ctor(tx).unwrap(); + + // While the file is open, windows won't report modified events for it. + // Flushing doesn't help. So make sure it is closed before we validate. + let path = { + let mut file = NamedTempFile::new().unwrap(); + w.watch(file.path()).unwrap(); + thread::sleep_ms(1000); // give watcher enough time to spin up + + // make some files that should be exlcuded from watch. this works because tempfile creates + // them all in the same directory. + let mut excluded_file = NamedTempFile::new().unwrap(); + let another_excluded_file = NamedTempFile::new().unwrap(); + excluded_file.write_all(b"shouldn't get an event for this").unwrap(); + + file.write_all(b"foo").unwrap(); + file.flush().unwrap(); + file.path().to_path_buf() + }; + let events = validate_recv(rx, vec![(path.as_path(), op::WRITE), + (path.as_path(), op::REMOVE)]); + + // make sure that ONLY the target path is in the list of received events + for evt in events { + assert!(evt.path.unwrap() == path.as_path()); + } } +#[cfg(not(target_os = "windows"))] fn validate_watch_single_file(ctor: F) where F: Fn(Sender) -> Result, W: Watcher { let mut file = NamedTempFile::new().unwrap(); From 8f00aa1a12e687d8c7976ef1c119e8aa62ed75ae Mon Sep 17 00:00:00 2001 From: John Quigley Date: Wed, 18 Nov 2015 13:21:57 -0500 Subject: [PATCH 12/24] Add appveyor file --- appveyor.yml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 appveyor.yml diff --git a/appveyor.yml b/appveyor.yml new file mode 100644 index 00000000..3c14cfec --- /dev/null +++ b/appveyor.yml @@ -0,0 +1,18 @@ +environment: + matrix: + - TARGET: x86_64-pc-windows-msvc + - TARGET: x86_64-pc-windows-gnu + - TARGET: i686-pc-windows-msvc + - TARGET: i686-pc-windows-gnu +install: + - ps: Start-FileDownload "https://static.rust-lang.org/dist/rust-nightly-${env:TARGET}.exe" + - rust-nightly-%TARGET%.exe /VERYSILENT /NORESTART /DIR="C:\Program Files (x86)\Rust" + - SET PATH=%PATH%;C:\Program Files (x86)\Rust\bin + - SET PATH=%PATH%;C:\MinGW\bin + - rustc -V + - cargo -V + +build: false + +test_script: + - cargo test --verbose From c536e64255712c2df8f0d1bfaae91420bfdc96dc Mon Sep 17 00:00:00 2001 From: John Quigley Date: Wed, 18 Nov 2015 13:35:22 -0500 Subject: [PATCH 13/24] appveyor: don't build the gnu targets --- appveyor.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/appveyor.yml b/appveyor.yml index 3c14cfec..9de0be60 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,9 +1,10 @@ environment: matrix: - TARGET: x86_64-pc-windows-msvc - - TARGET: x86_64-pc-windows-gnu - TARGET: i686-pc-windows-msvc - - TARGET: i686-pc-windows-gnu + # skip the gnu targets to reduce build time + #- TARGET: x86_64-pc-windows-gnu + #- TARGET: i686-pc-windows-gnu install: - ps: Start-FileDownload "https://static.rust-lang.org/dist/rust-nightly-${env:TARGET}.exe" - rust-nightly-%TARGET%.exe /VERYSILENT /NORESTART /DIR="C:\Program Files (x86)\Rust" From 34b9c75c360a8baafda6860df3c2a3e2903696a7 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Wed, 18 Nov 2015 21:50:52 -0500 Subject: [PATCH 14/24] Windows: fix some winapi imports --- src/windows.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index dd3796de..810f5e0a 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -1,11 +1,7 @@ -extern crate libc; extern crate kernel32; -extern crate winapi; -use winapi::{HANDLE, INVALID_HANDLE_VALUE, fileapi, winbase, winnt}; -use winapi::minwinbase::{OVERLAPPED, LPOVERLAPPED}; -use winapi::winerror::ERROR_OPERATION_ABORTED; -use winapi::winnt::FILE_NOTIFY_INFORMATION; +use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, + ERROR_OPERATION_ABORTED, FILE_NOTIFY_INFORMATION, fileapi, winbase, winnt}; use std::collections::HashMap; use std::mem; From 672072a6787ae3f6db4dc65058e16e6e5300bf38 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Wed, 18 Nov 2015 21:51:18 -0500 Subject: [PATCH 15/24] Windows: remove needless return (clippy) --- src/windows.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index 810f5e0a..dc58e779 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -273,9 +273,9 @@ impl Watcher for ReadDirectoryChangesWatcher { fn new(event_tx: Sender) -> Result { let action_tx = ReadDirectoryChangesServer::start(event_tx); - return Ok(ReadDirectoryChangesWatcher { + Ok(ReadDirectoryChangesWatcher { tx: action_tx - }); + }) } fn watch>(&mut self, path: P) -> Result<(), Error> { From 6c658c6b772b20d01a29bfa53876ffbbda79df1c Mon Sep 17 00:00:00 2001 From: John Quigley Date: Wed, 18 Nov 2015 22:43:37 -0500 Subject: [PATCH 16/24] Windows: wait in alertable sleep on shutdown - Allows final read callback to fire, so that we can destroy outstanding request memory --- src/windows.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/windows.rs b/src/windows.rs index dc58e779..c6bdf8da 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -73,6 +73,11 @@ impl ReadDirectoryChangesServer { close_handle(*handle); } } + // wait for final read callback. required to avoid leaking callback + // memory + unsafe { + kernel32::SleepEx(500, 1); + } break; } } @@ -215,7 +220,8 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove let request: Box = mem::transmute(overlapped.hEvent); if error_code == ERROR_OPERATION_ABORTED { - // We receive this error when the directory for this request is unwatched? + // received when dir is unwatched or watcher is shutdown; return and let overlapped/request + // get drop-cleaned return; } From 3c309b7bc3c5cd42b3102c63e9ebdd9949d4bba7 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Wed, 18 Nov 2015 22:44:14 -0500 Subject: [PATCH 17/24] Add me to contributors --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 3c8281ad..2d7a52a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ authors = [ "Francisco Giordano ", "Jake Kerr", "Ty Overby ", + "John Quigley " ] description = "Cross-platform filesystem notification library" From e6434a40806138301f2ebec5f33a036e9b79d43d Mon Sep 17 00:00:00 2001 From: John Quigley Date: Thu, 19 Nov 2015 10:47:59 -0500 Subject: [PATCH 18/24] Windows: add a failing test that shows the shutdown leak - This required adding a new channel to the watcher that allows the test to observe internal changes ("Meta Events") --- src/windows.rs | 34 +++++++++++++++++++++++-------- tests/windows.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 8 deletions(-) create mode 100644 tests/windows.rs diff --git a/src/windows.rs b/src/windows.rs index c6bdf8da..366539cb 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -18,10 +18,11 @@ use super::{Event, Error, op, Op, Watcher}; const BUF_SIZE: u32 = 16384; -#[derive(Debug,Clone)] +#[derive(Clone)] struct ReadData { dir: PathBuf, // directory that is being watched file: Option, // if a file is being watched, this is its full path + meta_tx: Sender, } struct ReadDirectoryRequest { @@ -37,19 +38,25 @@ enum Action { Stop } +pub enum MetaEvent { + WatcherComplete +} + struct ReadDirectoryChangesServer { rx: Receiver, tx: Sender, + meta_tx: Sender, watches: HashMap } impl ReadDirectoryChangesServer { - fn start(event_tx: Sender) -> Sender { + fn start(event_tx: Sender, meta_tx: Sender) -> Sender { let (action_tx, action_rx) = channel(); thread::spawn(move || { let server = ReadDirectoryChangesServer { tx: event_tx, rx: action_rx, + meta_tx: meta_tx, watches: HashMap::new() }; server.run(); @@ -146,7 +153,8 @@ impl ReadDirectoryChangesServer { }; let rd = ReadData { dir: dir_target, - file: wf + file: wf, + meta_tx: self.meta_tx.clone(), }; self.watches.insert(path.clone(), handle); start_read(&rd, &self.tx, handle); @@ -220,6 +228,7 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove let request: Box = mem::transmute(overlapped.hEvent); if error_code == ERROR_OPERATION_ABORTED { + let _ = request.data.meta_tx.send(MetaEvent::WatcherComplete); // received when dir is unwatched or watcher is shutdown; return and let overlapped/request // get drop-cleaned return; @@ -275,13 +284,22 @@ pub struct ReadDirectoryChangesWatcher { tx: Sender } +impl ReadDirectoryChangesWatcher { + pub fn create(event_tx: Sender, meta_tx: Sender) -> + Result { + let action_tx = ReadDirectoryChangesServer::start(event_tx,meta_tx); + + Ok(ReadDirectoryChangesWatcher { + tx: action_tx + }) + } +} + impl Watcher for ReadDirectoryChangesWatcher { fn new(event_tx: Sender) -> Result { - let action_tx = ReadDirectoryChangesServer::start(event_tx); - - Ok(ReadDirectoryChangesWatcher { - tx: action_tx - }) + // create dummy channel for meta event + let (tx, _) = channel(); + ReadDirectoryChangesWatcher::create(event_tx, tx) } fn watch>(&mut self, path: P) -> Result<(), Error> { diff --git a/tests/windows.rs b/tests/windows.rs new file mode 100644 index 00000000..af586f89 --- /dev/null +++ b/tests/windows.rs @@ -0,0 +1,52 @@ +extern crate notify; +extern crate tempdir; +extern crate tempfile; +extern crate time; + + +use notify::*; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::thread; +use std::sync::mpsc::{channel, Sender, Receiver}; +use tempdir::TempDir; +use tempfile::NamedTempFile; + +#[cfg(target_os="windows")] +#[test] +fn shutdown() { + // create a watcher for n directories. start the watcher, then shut it down. inspect + // the watcher to make sure that it received final callbacks for all n watchers. + + let mut dirs:Vec = Vec::new(); + let dir_count = 100; + + // to get meta events, we have to pass in the meta channel + let (meta_tx,meta_rx) = channel(); + + { + let (tx, _) = channel(); + let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap(); + + for _ in 0..dir_count { + let d = TempDir::new("d").unwrap(); + //println!("{:?}", d.path()); + w.watch(d.path()).unwrap(); + dirs.push(d); + } + } + + const TIMEOUT_S: f64 = 4.0; + let deadline = time::precise_time_s() + TIMEOUT_S; + let mut watchers_shutdown = 0; + while time::precise_time_s() < deadline { + if let Ok(actual) = meta_rx.try_recv() { + match actual { + WatcherComplete => watchers_shutdown += 1 + } + } + thread::sleep_ms(50); + } + + assert_eq!(watchers_shutdown,dir_count); +} From 02bca4456fc4005e2d56f75d3ab71adf8f2083ea Mon Sep 17 00:00:00 2001 From: John Quigley Date: Thu, 19 Nov 2015 15:34:00 -0500 Subject: [PATCH 19/24] Windows: fix shutdown leak - Each watcher now has a a semaphore that is used to indicate when the final completion routine is called (or a read failed) - Also added some general error handling to make the code more robust in the face of deleted watch directories, etc --- src/windows.rs | 77 +++++++++++++++++++++++++++++++++--------------- tests/windows.rs | 65 ++++++++++++++++++++++++++++++---------- 2 files changed, 103 insertions(+), 39 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index 366539cb..b57e2e6b 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -1,6 +1,6 @@ extern crate kernel32; -use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, +use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, INFINITE, TRUE, ERROR_OPERATION_ABORTED, FILE_NOTIFY_INFORMATION, fileapi, winbase, winnt}; use std::collections::HashMap; @@ -22,7 +22,7 @@ const BUF_SIZE: u32 = 16384; struct ReadData { dir: PathBuf, // directory that is being watched file: Option, // if a file is being watched, this is its full path - meta_tx: Sender, + complete_sem: HANDLE, } struct ReadDirectoryRequest { @@ -39,14 +39,19 @@ enum Action { } pub enum MetaEvent { - WatcherComplete + SingleWatchComplete +} + +struct WatchState { + dir_handle: HANDLE, + complete_sem: HANDLE, } struct ReadDirectoryChangesServer { rx: Receiver, tx: Sender, meta_tx: Sender, - watches: HashMap + watches: HashMap } impl ReadDirectoryChangesServer { @@ -75,15 +80,8 @@ impl ReadDirectoryChangesServer { Action::Unwatch(path) => self.remove_watch(path), Action::Stop => { stopped = true; - for (_, handle) in &self.watches { - unsafe { - close_handle(*handle); - } - } - // wait for final read callback. required to avoid leaking callback - // memory - unsafe { - kernel32::SleepEx(500, 1); + for (_, ws) in &self.watches { + stop_watch(ws, &self.meta_tx); } break; } @@ -151,28 +149,50 @@ impl ReadDirectoryChangesServer { } else { None }; + // every watcher gets its own semaphore to signal completion + let semaphore = unsafe { + kernel32::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) + }; + if semaphore == ptr::null_mut() || semaphore == INVALID_HANDLE_VALUE { + unsafe { kernel32::CloseHandle(handle); } + let _ = self.tx.send(Event { + path: Some(path), + op: Err(Error::Generic("Failed to create semaphore for watch.".to_owned())) + }); + return; + } let rd = ReadData { dir: dir_target, file: wf, - meta_tx: self.meta_tx.clone(), + complete_sem: semaphore + }; + let ws = WatchState { + dir_handle: handle, + complete_sem: semaphore }; - self.watches.insert(path.clone(), handle); + self.watches.insert(path.clone(), ws); start_read(&rd, &self.tx, handle); } fn remove_watch(&mut self, path: PathBuf) { - if let Some(handle) = self.watches.remove(&path) { - unsafe { - close_handle(handle); + if let Some(ws) = self.watches.remove(&path) { + stop_watch(&ws, &self.meta_tx); } } } + +fn stop_watch(ws:&WatchState,meta_tx: &Sender) { + unsafe { + let cio = kernel32::CancelIo(ws.dir_handle); + let ch = kernel32::CloseHandle(ws.dir_handle); + // have to wait for it, otherwise we leak the memory allocated for there read request + if cio != 0 && ch != 0 { + kernel32::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE); } + kernel32::CloseHandle(ws.complete_sem); -unsafe fn close_handle(handle: HANDLE) { - // TODO: Handle errors - kernel32::CancelIo(handle); - kernel32::CloseHandle(handle); + } + let _ = meta_tx.send(MetaEvent::SingleWatchComplete); } fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { @@ -208,7 +228,7 @@ fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { // This is using an asynchronous call with a completion routine for receiving notifications // An I/O completion port would probably be more performant - kernel32::ReadDirectoryChangesW( + let ret = kernel32::ReadDirectoryChangesW( handle, req_buf, BUF_SIZE, @@ -218,9 +238,18 @@ fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { &mut *overlapped as *mut OVERLAPPED, Some(handle_event)); + if ret == 0 { + // error reading. retransmute request memory to allow drop. + // allow overlapped to drop by omitting forget() + let request: Box = mem::transmute(request_p); + + kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); + } else { + // read ok. forget overlapped to let the completion routine handle memory mem::forget(overlapped); } } +} unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED) { // TODO: Use Box::from_raw when it is no longer unstable @@ -228,9 +257,9 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove let request: Box = mem::transmute(overlapped.hEvent); if error_code == ERROR_OPERATION_ABORTED { - let _ = request.data.meta_tx.send(MetaEvent::WatcherComplete); // received when dir is unwatched or watcher is shutdown; return and let overlapped/request // get drop-cleaned + kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); return; } diff --git a/tests/windows.rs b/tests/windows.rs index af586f89..666c8af2 100644 --- a/tests/windows.rs +++ b/tests/windows.rs @@ -3,50 +3,85 @@ extern crate tempdir; extern crate tempfile; extern crate time; - use notify::*; -use std::io::Write; -use std::path::{Path, PathBuf}; use std::thread; -use std::sync::mpsc::{channel, Sender, Receiver}; +use std::sync::mpsc::{channel, Receiver}; use tempdir::TempDir; -use tempfile::NamedTempFile; +fn check_for_error(rx:&Receiver) { + while let Ok(res) = rx.try_recv() { + match res.op { + Err(e) => panic!("unexpected err: {:?}: {:?}", e, res.path), + _ => () + } + }; +} #[cfg(target_os="windows")] #[test] fn shutdown() { // create a watcher for n directories. start the watcher, then shut it down. inspect // the watcher to make sure that it received final callbacks for all n watchers. - - let mut dirs:Vec = Vec::new(); let dir_count = 100; // to get meta events, we have to pass in the meta channel let (meta_tx,meta_rx) = channel(); - + let (tx, rx) = channel(); { - let (tx, _) = channel(); + let mut dirs:Vec = Vec::new(); let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap(); for _ in 0..dir_count { - let d = TempDir::new("d").unwrap(); + let d = TempDir::new("rsnotifytest").unwrap(); + dirs.push(d); + } + + for d in &dirs { // need the ref, otherwise its a move and the dir will be dropped! //println!("{:?}", d.path()); w.watch(d.path()).unwrap(); - dirs.push(d); } + + thread::sleep_ms(2000); // give watcher time to watch paths before we drop it + + // unwatch half of the directories, let the others get stopped when we go out of scope + for d in &dirs[0..dir_count/2] { + w.unwatch(d.path()).unwrap(); + } + + thread::sleep_ms(2000); // sleep to unhook the watches } - const TIMEOUT_S: f64 = 4.0; + check_for_error(&rx); + + const TIMEOUT_S: f64 = 60.0; // give it PLENTY of time before we declare failure let deadline = time::precise_time_s() + TIMEOUT_S; let mut watchers_shutdown = 0; - while time::precise_time_s() < deadline { + while watchers_shutdown != dir_count && time::precise_time_s() < deadline { if let Ok(actual) = meta_rx.try_recv() { match actual { - WatcherComplete => watchers_shutdown += 1 + notify::windows::MetaEvent::SingleWatchComplete => watchers_shutdown += 1 } } - thread::sleep_ms(50); + thread::sleep_ms(50); // don't burn cpu, can take some time for completion events to fire } assert_eq!(watchers_shutdown,dir_count); } + +#[cfg(target_os="windows")] +#[test] +#[ignore] +// repeatedly watch and unwatch a directory; make sure process memory does not increase. +// you use task manager to watch the memory; it will fluctuate a bit, but should not leak overall +fn memtest_manual() { + loop { + let (tx, rx) = channel(); + let d = TempDir::new("rsnotifytest").unwrap(); + { + let (meta_tx,_) = channel(); + let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap(); + w.watch(d.path()).unwrap(); + thread::sleep_ms(1); // this should make us run pretty hot but not insane + } + check_for_error(&rx); + } +} From 7dc1f555ff065154eb9e57134631e52bb958c6dc Mon Sep 17 00:00:00 2001 From: John Quigley Date: Thu, 19 Nov 2015 15:35:16 -0500 Subject: [PATCH 20/24] Windows: increase watcher test wait time to 50ms - Seen it fail with 5ms --- tests/notify.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/notify.rs b/tests/notify.rs index 9e807df5..5eafe641 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -150,7 +150,7 @@ fn validate_watch_dir(ctor: F) where w.watch(dir.path()).unwrap(); // Windows needs some time for thread spinup before we start creating files. if cfg!(target_os = "windows") { - thread::sleep_ms(5); + thread::sleep_ms(250); } let f111 = NamedTempFile::new_in(dir11.path()).unwrap(); From ad5e27424d65b2467e825c1171f48724e2f1d6ea Mon Sep 17 00:00:00 2001 From: John Quigley Date: Thu, 19 Nov 2015 19:01:49 -0500 Subject: [PATCH 21/24] Windows: eliminate warning in test --- tests/notify.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/notify.rs b/tests/notify.rs index 5eafe641..8218562d 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -107,6 +107,7 @@ fn validate_watch_single_file(ctor: F) where // them all in the same directory. let mut excluded_file = NamedTempFile::new().unwrap(); let another_excluded_file = NamedTempFile::new().unwrap(); + let _ = another_excluded_file; // eliminate warning excluded_file.write_all(b"shouldn't get an event for this").unwrap(); file.write_all(b"foo").unwrap(); From 99a6fde68f622994269de6bb07b742141bcccff5 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Thu, 19 Nov 2015 19:08:55 -0500 Subject: [PATCH 22/24] Windows: replace transmute with Box::from_raw --- src/windows.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index c6bdf8da..5a33ef7e 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -215,9 +215,8 @@ fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { } unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED) { - // TODO: Use Box::from_raw when it is no longer unstable - let overlapped: Box = mem::transmute(overlapped); - let request: Box = mem::transmute(overlapped.hEvent); + let overlapped: Box = Box::from_raw(overlapped); + let request: Box = Box::from_raw(overlapped.hEvent as *mut _); if error_code == ERROR_OPERATION_ABORTED { // received when dir is unwatched or watcher is shutdown; return and let overlapped/request From e717093d8eb40ed43811a1c5489514776a84a87e Mon Sep 17 00:00:00 2001 From: John Quigley Date: Fri, 20 Nov 2015 22:06:34 -0500 Subject: [PATCH 23/24] Windows: wait for reply in watch() - Allows errors to be returned immediately, rather than on the event channel - Use a semaphore to wake up the server so that watch doesn't have to block for up to 500ms - Add a new metaevent to let unit test check this --- src/windows.rs | 136 +++++++++++++++++++++++++++++++++-------------- tests/notify.rs | 4 -- tests/windows.rs | 57 ++++++++++++++++++-- 3 files changed, 150 insertions(+), 47 deletions(-) diff --git a/src/windows.rs b/src/windows.rs index fa45449e..9c1066cb 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -1,6 +1,6 @@ extern crate kernel32; -use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, INFINITE, TRUE, +use winapi::{OVERLAPPED, LPOVERLAPPED, HANDLE, INVALID_HANDLE_VALUE, INFINITE, TRUE, WAIT_OBJECT_0, ERROR_OPERATION_ABORTED, FILE_NOTIFY_INFORMATION, fileapi, winbase, winnt}; use std::collections::HashMap; @@ -39,7 +39,8 @@ enum Action { } pub enum MetaEvent { - SingleWatchComplete + SingleWatchComplete, + WatcherAwakened } struct WatchState { @@ -51,18 +52,26 @@ struct ReadDirectoryChangesServer { rx: Receiver, tx: Sender, meta_tx: Sender, - watches: HashMap + cmd_tx: Sender>, + watches: HashMap, + wakeup_sem: HANDLE } impl ReadDirectoryChangesServer { - fn start(event_tx: Sender, meta_tx: Sender) -> Sender { + fn start(event_tx: Sender, meta_tx: Sender, cmd_tx:Sender>, wakeup_sem: HANDLE) -> Sender { + let (action_tx, action_rx) = channel(); + // it is, in fact, ok to send the semaphore across threads + let sem_temp = wakeup_sem as u64; thread::spawn(move || { + let wakeup_sem = sem_temp as HANDLE; let server = ReadDirectoryChangesServer { tx: event_tx, rx: action_rx, meta_tx: meta_tx, - watches: HashMap::new() + cmd_tx: cmd_tx, + watches: HashMap::new(), + wakeup_sem: wakeup_sem }; server.run(); }); @@ -76,7 +85,10 @@ impl ReadDirectoryChangesServer { while let Ok(action) = self.rx.try_recv() { match action { - Action::Watch(path) => self.add_watch(path), + Action::Watch(path) => { + let res = self.add_watch(path); + let _ = self.cmd_tx.send(res); + }, Action::Unwatch(path) => self.remove_watch(path), Action::Stop => { stopped = true; @@ -92,21 +104,25 @@ impl ReadDirectoryChangesServer { break; } - // call sleepex with alertable flag so that our completion routine fires unsafe { - kernel32::SleepEx(500, 1); + // wait with alertable flag so that the completion routine fires + let waitres = kernel32::WaitForSingleObjectEx(self.wakeup_sem, 500, TRUE); + if waitres == WAIT_OBJECT_0 { + let _ = self.meta_tx.send(MetaEvent::WatcherAwakened); + } } } + + // we have to clean this up, since the watcher may be long gone + unsafe { + kernel32::CloseHandle(self.wakeup_sem); + } } - fn add_watch(&mut self, path: PathBuf) { + fn add_watch(&mut self, path: PathBuf) -> Result { // path must exist and be either a file or directory if !path.is_dir() && !path.is_file() { - let _ = self.tx.send(Event { - path: Some(path), - op: Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned())) - }); - return; + return Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned())); } let (watching_file,dir_target) = { @@ -137,11 +153,7 @@ impl ReadDirectoryChangesServer { // TODO: Call GetLastError for better error info? Err(Error::PathNotFound) }; - let _ = self.tx.send(Event { - path: None, - op: err - }); - return; + return err; } } let wf = if watching_file { @@ -155,11 +167,7 @@ impl ReadDirectoryChangesServer { }; if semaphore == ptr::null_mut() || semaphore == INVALID_HANDLE_VALUE { unsafe { kernel32::CloseHandle(handle); } - let _ = self.tx.send(Event { - path: Some(path), - op: Err(Error::Generic("Failed to create semaphore for watch.".to_owned())) - }); - return; + return Err(Error::Generic("Failed to create semaphore for watch.".to_owned())); } let rd = ReadData { dir: dir_target, @@ -172,6 +180,7 @@ impl ReadDirectoryChangesServer { }; self.watches.insert(path.clone(), ws); start_read(&rd, &self.tx, handle); + Ok(path.to_path_buf()) } fn remove_watch(&mut self, path: PathBuf) { @@ -188,7 +197,7 @@ fn stop_watch(ws:&WatchState,meta_tx: &Sender) { // have to wait for it, otherwise we leak the memory allocated for there read request if cio != 0 && ch != 0 { kernel32::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE); -} + } kernel32::CloseHandle(ws.complete_sem); } @@ -223,7 +232,7 @@ fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { // for our own purposes let req_buf = request.buffer.as_mut_ptr() as *mut c_void; - let request_p: *mut c_void = mem::transmute(request); + let request_p = Box::into_raw(request) as *mut c_void; overlapped.hEvent = request_p; // This is using an asynchronous call with a completion routine for receiving notifications @@ -246,10 +255,10 @@ fn start_read(rd: &ReadData, tx: &Sender, handle: HANDLE) { kernel32::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut()); } else { // read ok. forget overlapped to let the completion routine handle memory - mem::forget(overlapped); + mem::forget(overlapped); + } } } -} unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED) { let overlapped: Box = Box::from_raw(overlapped); @@ -309,25 +318,70 @@ unsafe extern "system" fn handle_event(error_code: u32, _bytes_written: u32, ove } pub struct ReadDirectoryChangesWatcher { - tx: Sender + tx: Sender, + cmd_rx: Receiver>, + wakeup_sem: HANDLE } impl ReadDirectoryChangesWatcher { - pub fn create(event_tx: Sender, meta_tx: Sender) -> - Result { - let action_tx = ReadDirectoryChangesServer::start(event_tx,meta_tx); + pub fn create(event_tx: Sender, meta_tx: Sender) -> Result { + let (cmd_tx, cmd_rx) = channel(); + + let wakeup_sem = unsafe { + kernel32::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) + }; + if wakeup_sem == ptr::null_mut() || wakeup_sem == INVALID_HANDLE_VALUE { + return Err(Error::Generic("Failed to create wakeup semaphore.".to_owned())); + } + + let action_tx = ReadDirectoryChangesServer::start(event_tx,meta_tx,cmd_tx,wakeup_sem); Ok(ReadDirectoryChangesWatcher { - tx: action_tx + tx: action_tx, + cmd_rx: cmd_rx, + wakeup_sem: wakeup_sem }) } + + fn wakeup_server(&mut self) { + // breaks the server out of its wait state. right now this is really just an optimization, + // so that if you add a watch you don't block for 500ms in watch() while the + // server sleeps. + unsafe { kernel32::ReleaseSemaphore(self.wakeup_sem, 1, ptr::null_mut()); } + } + + fn send_action_require_ack(&mut self, action:Action, pb:&PathBuf) -> Result<(), Error> { + match self.tx.send(action) { + Err(_) => Err(Error::Generic("Error sending to internal channel".to_owned())), + Ok(_) => { + // wake 'em up, we don't want to wait around for the ack + self.wakeup_server(); + + match self.cmd_rx.recv() { + Err(_) => Err(Error::Generic("Error receiving from command channel".to_owned())), + Ok(ack_res) => { + match ack_res { + Err(e) => Err(Error::Generic(format!("Error in watcher: {:?}", e))), + Ok(ack_pb) => { + if pb.as_path() != ack_pb.as_path() { + Err(Error::Generic(format!("Expected ack for {:?} but got ack for {:?}", pb, ack_pb))) + } else { + Ok(()) + } + } + } + } + } + } + } + } } impl Watcher for ReadDirectoryChangesWatcher { fn new(event_tx: Sender) -> Result { // create dummy channel for meta event - let (tx, _) = channel(); - ReadDirectoryChangesWatcher::create(event_tx, tx) + let (meta_tx, _) = channel(); + ReadDirectoryChangesWatcher::create(event_tx, meta_tx) } fn watch>(&mut self, path: P) -> Result<(), Error> { @@ -336,19 +390,21 @@ impl Watcher for ReadDirectoryChangesWatcher { if !pb.is_dir() && !pb.is_file() { return Err(Error::Generic("Input watch path is neither a file nor a directory.".to_owned())); } - - self.tx.send(Action::Watch(path.as_ref().to_path_buf())) - .map_err(|_| Error::Generic("Error sending to internal channel".to_owned())) + self.send_action_require_ack(Action::Watch(path.as_ref().to_path_buf()), &pb) } fn unwatch>(&mut self, path: P) -> Result<(), Error> { - self.tx.send(Action::Unwatch(path.as_ref().to_path_buf())) - .map_err(|_| Error::Generic("Error sending to internal channel".to_owned())) + let res = self.tx.send(Action::Unwatch(path.as_ref().to_path_buf())) + .map_err(|_| Error::Generic("Error sending to internal channel".to_owned())); + self.wakeup_server(); + res } } impl Drop for ReadDirectoryChangesWatcher { fn drop(&mut self) { let _ = self.tx.send(Action::Stop); + // better wake it up + self.wakeup_server(); } } diff --git a/tests/notify.rs b/tests/notify.rs index 8218562d..8a47fb93 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -149,10 +149,6 @@ fn validate_watch_dir(ctor: F) where let mut w = ctor(tx).unwrap(); w.watch(dir.path()).unwrap(); - // Windows needs some time for thread spinup before we start creating files. - if cfg!(target_os = "windows") { - thread::sleep_ms(250); - } let f111 = NamedTempFile::new_in(dir11.path()).unwrap(); let f111_path = f111.path().to_owned(); diff --git a/tests/windows.rs b/tests/windows.rs index 666c8af2..f1ad84e8 100644 --- a/tests/windows.rs +++ b/tests/windows.rs @@ -40,8 +40,6 @@ fn shutdown() { w.watch(d.path()).unwrap(); } - thread::sleep_ms(2000); // give watcher time to watch paths before we drop it - // unwatch half of the directories, let the others get stopped when we go out of scope for d in &dirs[0..dir_count/2] { w.unwatch(d.path()).unwrap(); @@ -58,7 +56,8 @@ fn shutdown() { while watchers_shutdown != dir_count && time::precise_time_s() < deadline { if let Ok(actual) = meta_rx.try_recv() { match actual { - notify::windows::MetaEvent::SingleWatchComplete => watchers_shutdown += 1 + notify::windows::MetaEvent::SingleWatchComplete => watchers_shutdown += 1, + _ => () } } thread::sleep_ms(50); // don't burn cpu, can take some time for completion events to fire @@ -67,6 +66,58 @@ fn shutdown() { assert_eq!(watchers_shutdown,dir_count); } +#[cfg(target_os="windows")] +#[test] +fn watch_deleted_fails() { + let pb = { + let d = TempDir::new("rsnotifytest").unwrap(); + d.path().to_path_buf() + }; + + let (tx, _) = channel(); + let mut w = ReadDirectoryChangesWatcher::new(tx).unwrap(); + match w.watch(pb.as_path()) { + Ok(x) => panic!("Should have failed, but got: {:?}", x), + Err(_) => () + } +} + +#[cfg(target_os="windows")] +#[test] +fn watch_server_can_be_awakened() { + let (tx, _) = channel(); + let (meta_tx,meta_rx) = channel(); + let mut w = ReadDirectoryChangesWatcher::create(tx,meta_tx).unwrap(); + let d = TempDir::new("rsnotifytest").unwrap(); + let d2 = TempDir::new("rsnotifytest").unwrap(); + + match w.watch(d.path()) { + Ok(_) => (), + Err(e) => panic!("Oops: {:?}", e) + } + match w.watch(d2.path()) { + Ok(_) => (), + Err(e) => panic!("Oops: {:?}", e) + } + // should be at least one awaken in there + const TIMEOUT_S: f64 = 5.0; + let deadline = time::precise_time_s() + TIMEOUT_S; + let mut awakened = false; + while time::precise_time_s() < deadline { + if let Ok(actual) = meta_rx.try_recv() { + match actual { + notify::windows::MetaEvent::WatcherAwakened => awakened = true, + _ => () + } + } + thread::sleep_ms(50); + } + + if !awakened { + panic!("Failed to awaken"); + } +} + #[cfg(target_os="windows")] #[test] #[ignore] From 50af1c6b306708b18f5df1598b9cbff2373d74d6 Mon Sep 17 00:00:00 2001 From: John Quigley Date: Fri, 20 Nov 2015 22:21:28 -0500 Subject: [PATCH 24/24] Windows: fix atom indent fail --- src/windows.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/windows.rs b/src/windows.rs index 9c1066cb..3355cb24 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -186,9 +186,9 @@ impl ReadDirectoryChangesServer { fn remove_watch(&mut self, path: PathBuf) { if let Some(ws) = self.watches.remove(&path) { stop_watch(&ws, &self.meta_tx); - } } } +} fn stop_watch(ws:&WatchState,meta_tx: &Sender) { unsafe {