Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add PollWatcher scan events #507

Merged
merged 1 commit into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ path = "poll_sysfs.rs"
name = "watcher_kind"
path = "watcher_kind.rs"

[[example]]
name = "pollwatcher_scan"
path = "pollwatcher_scan.rs"

# specifically in its own sub folder
# to prevent cargo audit from complaining
#[[example]]
Expand Down
55 changes: 55 additions & 0 deletions examples/pollwatcher_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use notify::{poll::ScanEvent, Config, PollWatcher, RecursiveMode, Watcher};
use std::path::Path;

// Example for the pollwatcher scan callback feature.
// Returns the scanned paths
fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

let path = std::env::args()
.nth(1)
.expect("Argument 1 needs to be a path");

log::info!("Watching {path}");

if let Err(error) = watch(path) {
log::error!("Error: {error:?}");
}
}

fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {
let (tx, rx) = std::sync::mpsc::channel();

// if you want to use the same channel for both events
// and you need to differentiate between scan and file change events,
// then you will have to use something like this
enum Message {
Event(notify::Result<notify::Event>),
Scan(ScanEvent),
}

let tx_c = tx.clone();
// use the pollwatcher and set a callback for the scanning events
let mut watcher = PollWatcher::with_initial_scan(
move |watch_event| {
tx_c.send(Message::Event(watch_event)).unwrap();
},
Config::default(),
move |scan_event| {
tx.send(Message::Scan(scan_event)).unwrap();
},
)?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;

for res in rx {
match res {
Message::Event(e) => println!("Watch event {e:?}"),
Message::Scan(e) => println!("Scan event {e:?}"),
}
}

Ok(())
}
109 changes: 102 additions & 7 deletions notify/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,45 @@ use std::{
time::Duration,
};

/// Event send for registered handler on initial directory scans
pub type ScanEvent = crate::Result<PathBuf>;

/// Handler trait for receivers of ScanEvent.
/// Very much the same as [EventHandler], but including the Result.
///
/// See the full example for more information.
/// ```
pub trait ScanEventHandler: Send + 'static {
/// Handles an event.
fn handle_event(&mut self, event: ScanEvent);
}

impl<F> ScanEventHandler for F
where
F: FnMut(ScanEvent) + Send + 'static,
{
fn handle_event(&mut self, event: ScanEvent) {
(self)(event);
}
}

#[cfg(feature = "crossbeam-channel")]
impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
fn handle_event(&mut self, event: ScanEvent) {
let _ = self.send(event);
}
}

impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
fn handle_event(&mut self, event: ScanEvent) {
let _ = self.send(event);
}
}

impl ScanEventHandler for () {
fn handle_event(&mut self, _event: ScanEvent) {}
}

use data::{DataBuilder, WatchData};
mod data {
use crate::{
Expand All @@ -34,9 +73,12 @@ mod data {
};
use walkdir::WalkDir;

use super::ScanEventHandler;

/// Builder for [`WatchData`] & [`PathData`].
pub(super) struct DataBuilder {
emitter: EventEmitter,
scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,

// TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
// in future.
Expand All @@ -47,12 +89,27 @@ mod data {
}

impl DataBuilder {
pub(super) fn new<F>(event_handler: F, compare_content: bool) -> Self
pub(super) fn new<F, G>(
event_handler: F,
compare_content: bool,
scan_emitter: Option<G>,
) -> Self
where
F: EventHandler,
G: ScanEventHandler,
{
let scan_emitter = match scan_emitter {
None => None,
Some(v) => {
// workaround for a weird type resolution bug when directly going to dyn Trait
let intermediate: Box<RefCell<dyn ScanEventHandler>> =
Box::new(RefCell::new(v));
Some(intermediate)
}
};
Self {
emitter: EventEmitter::new(event_handler),
scan_emitter,
build_hasher: compare_content.then(RandomState::default),
now: Instant::now(),
}
Expand Down Expand Up @@ -131,7 +188,7 @@ mod data {
}

let all_path_data =
Self::scan_all_path_data(data_builder, root.clone(), is_recursive).collect();
Self::scan_all_path_data(data_builder, root.clone(), is_recursive, true).collect();

Some(Self {
root,
Expand All @@ -148,7 +205,7 @@ mod data {
pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
// scan current filesystem.
for (path, new_path_data) in
Self::scan_all_path_data(data_builder, self.root.clone(), self.is_recursive)
Self::scan_all_path_data(data_builder, self.root.clone(), self.is_recursive, false)
{
let old_path_data = self
.all_path_data
Expand Down Expand Up @@ -191,7 +248,10 @@ mod data {
data_builder: &'_ DataBuilder,
root: PathBuf,
is_recursive: bool,
// whether this is an initial scan, used only for events
is_initial: bool,
) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
log::trace!("rescanning {root:?}");
// WalkDir return only one entry if root is a file (not a folder),
// so we can use single logic to do the both file & dir's jobs.
//
Expand All @@ -212,11 +272,25 @@ mod data {
// propagate to event handler. It may not consistent.
//
// FIXME: Should we emit all IO error events? Or ignore them all?
.filter_map(|entry| entry.ok())
.filter_map(|entry| match entry.metadata() {
.filter_map(|entry_res| match entry_res {
Ok(entry) => Some(entry),
Err(err) => {
log::warn!("walkdir error scanning {err:?}");
let crate_err =
crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
data_builder.emitter.emit(Err(crate_err));
None
}
})
.filter_map(move |entry| match entry.metadata() {
Ok(metadata) => {
let path = entry.into_path();

if is_initial {
// emit initial scans
if let Some(ref emitter) = data_builder.scan_emitter {
emitter.borrow_mut().handle_event(Ok(path.clone()));
}
}
let meta_path = MetaPath::from_parts_unchecked(path, metadata);
let data_path = data_builder.build_path_data(&meta_path);

Expand Down Expand Up @@ -409,7 +483,28 @@ pub struct PollWatcher {
impl PollWatcher {
/// Create a new [PollWatcher], configured as needed.
pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
let data_builder = DataBuilder::new(event_handler, config.compare_contents());
Self::with_opt::<_, ()>(event_handler, config, None)
}

/// Create a new [PollWatcher] with an scan event handler.
///
/// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
event_handler: F,
config: Config,
scan_callback: G,
) -> crate::Result<PollWatcher> {
Self::with_opt(event_handler, config, Some(scan_callback))
}

/// create a new pollwatcher with all options
fn with_opt<F: EventHandler, G: ScanEventHandler>(
event_handler: F,
config: Config,
scan_callback: Option<G>,
) -> crate::Result<PollWatcher> {
let data_builder =
DataBuilder::new(event_handler, config.compare_contents(), scan_callback);

let poll_watcher = PollWatcher {
watches: Default::default(),
Expand Down
8 changes: 6 additions & 2 deletions notify/src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ unsafe extern "system" fn handle_event(
};

if !skip {
log::trace!("Event: path = `{}`, action = {:?}", path.display(), (*cur_entry).Action);
log::trace!(
"Event: path = `{}`, action = {:?}",
path.display(),
(*cur_entry).Action
);

let newe = Event::new(EventKind::Any).add_path(path);

Expand Down Expand Up @@ -504,7 +508,7 @@ impl ReadDirectoryChangesWatcher {
}

impl Watcher for ReadDirectoryChangesWatcher {
fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> {
fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
// create dummy channel for meta event
// TODO: determine the original purpose of this - can we remove it?
let (meta_tx, _) = unbounded();
Expand Down