Skip to content

Commit

Permalink
Merge pull request #214 from mitchmindtree/event_fn
Browse files Browse the repository at this point in the history
Remove channel in favour of accepting `Fn` in Watcher constructors
  • Loading branch information
mitchmindtree authored Oct 3, 2019
2 parents 361cfd7 + 5c1467a commit 97a870f
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 220 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- CHANGE: Remove `Op` and `DebouncedEvent` event classification. [#187]
- CHANGE: Make it opt-in to receive information about event kind. [#187]
- CHANGE: Make `Notice` events opt-in.
- CHANGE: Remove `Sender`s from watcher API in favour of `EventFn` [#214]

## 5.0.0-pre.2

Expand Down
14 changes: 6 additions & 8 deletions examples/monitor_raw.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
use crossbeam_channel::unbounded;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;

fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {
// Create a channel to receive the events.
let (tx, rx) = unbounded();
let (tx, rx) = std::sync::mpsc::channel();

// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let mut watcher: RecommendedWatcher = Watcher::new_immediate(tx)?;
let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| tx.send(res).unwrap())?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
watcher.watch(path, RecursiveMode::Recursive)?;

// This is a simple loop, but you may want to use more complex logic here,
// for example to handle I/O.
loop {
match rx.recv() {
for res in rx {
match res {
Ok(event) => println!("changed: {:?}", event),
Err(e) => println!("watch error: {:?}", e),
}
}

Ok(())
}

fn main() {
Expand Down
90 changes: 51 additions & 39 deletions src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#![allow(non_upper_case_globals, dead_code)]

use crate::{Config, Error, EventTx, RecursiveMode, Result, Watcher};
use crate::{Config, Error, EventFn, RecursiveMode, Result, Watcher};
use crate::event::*;
use crossbeam_channel::{unbounded, Receiver, Sender};
use fsevent as fse;
Expand All @@ -30,6 +30,7 @@ use std::path::{Path, PathBuf};
use std::ptr;
use std::slice;
use std::str::from_utf8;
use std::sync::Arc;
use std::thread;

/// FSEvents-based `Watcher` implementation
Expand All @@ -38,7 +39,7 @@ pub struct FsEventWatcher {
since_when: fs::FSEventStreamEventId,
latency: cf::CFTimeInterval,
flags: fs::FSEventStreamCreateFlags,
event_tx: EventTx,
event_fn: Arc<dyn EventFn>,
runloop: Option<usize>,
context: Option<Box<StreamContextInfo>>,
recursive_info: HashMap<PathBuf, bool>,
Expand Down Expand Up @@ -188,7 +189,7 @@ fn translate_flags(flags: fse::StreamFlags, precise: bool) -> Vec<Event> {
}

struct StreamContextInfo {
event_tx: EventTx,
event_fn: Arc<dyn EventFn>,
done: Receiver<()>,
recursive_info: HashMap<PathBuf, bool>,
}
Expand All @@ -198,6 +199,37 @@ extern "C" {
}

impl FsEventWatcher {
fn from_event_fn(event_fn: Arc<dyn EventFn>) -> Result<Self> {
Ok(FsEventWatcher {
paths: unsafe {
cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks)
},
since_when: fs::kFSEventStreamEventIdSinceNow,
latency: 0.0,
flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer,
event_fn: event_fn,
runloop: None,
context: None,
recursive_info: HashMap::new(),
})
}

fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
self.stop();
let result = self.append_path(path, recursive_mode);
// ignore return error: may be empty path list
let _ = self.run();
result
}

fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
self.stop();
let result = self.remove_path(path);
// ignore return error: may be empty path list
let _ = self.run();
result
}

#[inline]
fn is_running(&self) -> bool {
self.runloop.is_some()
Expand Down Expand Up @@ -296,7 +328,7 @@ impl FsEventWatcher {
let (done_tx, done_rx) = unbounded();

let info = StreamContextInfo {
event_tx: self.event_tx.clone(),
event_fn: self.event_fn.clone(),
done: done_rx,
recursive_info: self.recursive_info.clone(),
};
Expand Down Expand Up @@ -384,7 +416,7 @@ pub unsafe extern "C" fn callback(
let flags = slice::from_raw_parts_mut(e_ptr, num);
let ids = slice::from_raw_parts_mut(i_ptr, num);

let event_tx = &(*info).event_tx;
let event_fn = &(*info).event_fn;

for p in 0..num {
let i = CStr::from_ptr(paths[p]).to_bytes();
Expand Down Expand Up @@ -412,41 +444,25 @@ pub unsafe extern "C" fn callback(
if !handle_event { continue; }

for ev in translate_flags(flag, true).into_iter() { // TODO: precise
event_tx.send(Ok(ev.add_path(path.clone()))).ok(); // TODO: handle error
let ev = ev.add_path(path.clone());
(*event_fn)(Ok(ev));
}
}
}



impl Watcher for FsEventWatcher {
fn new_immediate(tx: Sender<Result<Event>>) -> Result<FsEventWatcher> {
Ok(FsEventWatcher {
paths: unsafe {
cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks)
},
since_when: fs::kFSEventStreamEventIdSinceNow,
latency: 0.0,
flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer,
event_tx: tx,
runloop: None,
context: None,
recursive_info: HashMap::new(),
})
fn new_immediate<F: EventFn>(event_fn: F) -> Result<FsEventWatcher> {
FsEventWatcher::from_event_fn(Arc::new(event_fn))
}

fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
self.stop();
let result = self.append_path(path, recursive_mode);
// ignore return error: may be empty path list
let _ = self.run();
result
self.watch_inner(path.as_ref(), recursive_mode)
}

fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
self.stop();
let result = self.remove_path(path);
// ignore return error: may be empty path list
let _ = self.run();
result
self.unwatch_inner(path.as_ref())
}

fn configure(&mut self, config: Config) -> Result<bool> {
Expand All @@ -470,10 +486,11 @@ fn test_fsevent_watcher_drop() {
use super::*;
use std::time::Duration;

let (tx, rx) = unbounded();
let (tx, rx) = std::sync::mpsc::channel();
let event_fn = move |res| tx.send(res).unwrap();

{
let mut watcher: RecommendedWatcher = Watcher::new_immediate(tx).unwrap();
let mut watcher: RecommendedWatcher = Watcher::new_immediate(event_fn).unwrap();
watcher.watch("../../", RecursiveMode::Recursive).unwrap();
thread::sleep(Duration::from_millis(2000));
println!("is running -> {}", watcher.is_running());
Expand All @@ -485,14 +502,9 @@ fn test_fsevent_watcher_drop() {

thread::sleep(Duration::from_millis(1000));

// if drop() works, this loop will quit after all Sender freed
// otherwise will block forever
for e in rx.iter() {
println!(
"debug => {:?} {:?}",
e.op.map(|e| e.bits()).unwrap_or(0),
e.path
);
for res in rx {
let e = res.unwrap();
println!("debug => {:?} {:?}", e.kind, e.paths);
}

println!("in test: {} works", file!());
Expand Down
Loading

0 comments on commit 97a870f

Please sign in to comment.