diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6d5a5841..084287c3 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: version: - - 1.47.0 # MSRV + - 1.50.0 # MSRV - stable - nightly os: [ubuntu-latest, macos-latest, windows-latest] diff --git a/Cargo.toml b/Cargo.toml index d5e7fd40..f687e975 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ mio = { version = "0.8", features = ["os-ext"] } futures = "0.3" serde_json = "1.0.39" tempfile = "3.2.0" +nix = "0.23.1" [features] default = ["macos_fsevent"] diff --git a/examples/poll_sysfs.rs b/examples/poll_sysfs.rs new file mode 100644 index 00000000..33075846 --- /dev/null +++ b/examples/poll_sysfs.rs @@ -0,0 +1,54 @@ +use notify::poll::PollWatcherConfig; +use notify::{PollWatcher, RecursiveMode, Watcher}; +use std::path::Path; +use std::time::Duration; + +#[cfg(not(target_os = "windows"))] +fn not_windows_main() -> notify::Result<()> { + let mut paths: Vec<_> = std::env::args() + .skip(1) + .map(|arg| Path::new(&arg).to_path_buf()) + .collect(); + if paths.is_empty() { + let lo_stats = Path::new("/sys/class/net/lo/statistics/tx_bytes").to_path_buf(); + if !lo_stats.exists() { + eprintln!("Must provide path to watch, default system path was not found (probably you're not running on Linux?)"); + std::process::exit(1); + } + println!( + "Trying {:?}, use `ping localhost` to see changes!", + lo_stats + ); + paths.push(lo_stats); + } + + println!("watching {:?}...", paths); + + let config = PollWatcherConfig { + compare_contents: true, + poll_interval: Duration::from_secs(2), + }; + let (tx, rx) = std::sync::mpsc::channel(); + let mut watcher = PollWatcher::with_config(tx, config)?; + for path in paths { + watcher.watch(&path, RecursiveMode::Recursive)?; + } + + for res in rx { + match res { + Ok(event) => println!("changed: {:?}", event), + Err(e) => println!("watch error: {:?}", e), + } + } + + Ok(()) +} + +fn main() -> notify::Result<()> { + #[cfg(not(target_os = "windows"))] + { + not_windows_main() + } + #[cfg(target_os = "windows")] + notify::Result::Ok(()) +} diff --git a/src/poll.rs b/src/poll.rs index 04e60caa..95cd9a5f 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -6,9 +6,13 @@ use super::event::*; use super::{Error, EventHandler, RecursiveMode, Result, Watcher}; use filetime::FileTime; +use std::collections::hash_map::RandomState; use std::collections::HashMap; use std::fmt::Debug; -use std::fs; +use std::fs::Metadata; +use std::hash::BuildHasher; +use std::hash::Hasher; +use std::io::{ErrorKind, Read}; use std::path::{Path, PathBuf}; use std::sync::{ atomic::{AtomicBool, Ordering}, @@ -16,11 +20,13 @@ use std::sync::{ }; use std::thread; use std::time::{Duration, Instant}; +use std::{fs, io}; use walkdir::WalkDir; -#[derive(Debug)] +#[derive(Debug, Clone)] struct PathData { mtime: i64, + hash: Option, last_check: Instant, } @@ -36,6 +42,33 @@ pub struct PollWatcher { watches: Arc>>, open: Arc, delay: Duration, + compare_contents: bool, +} + +/// General purpose configuration for [`PollWatcher`] specifically. Can be used to tune +/// this watcher differently than the other platform specific ones. +#[derive(Debug, Clone)] +pub struct PollWatcherConfig { + /// Interval between each rescan attempt. This can be extremely expensive for large + /// file trees so it is recommended to measure and tune accordingly. + pub poll_interval: Duration, + + /// Optional feature that will evaluate the contents of changed files to determine if + /// they have indeed changed using a fast hashing algorithm. This is especially important + /// for pseudo filesystems like those on Linux under /sys and /proc which are not obligated + /// to respect any other filesystem norms such as modification timestamps, file sizes, etc. + /// By enabling this feature, performance will be significantly impacted as all files will + /// need to be read and hashed at each `poll_interval`. + pub compare_contents: bool, +} + +impl Default for PollWatcherConfig { + fn default() -> Self { + Self { + poll_interval: Duration::from_secs(30), + compare_contents: false, + } + } } impl Debug for PollWatcher { @@ -45,6 +78,7 @@ impl Debug for PollWatcher { .field("watches", &self.watches) .field("open", &self.open) .field("delay", &self.delay) + .field("compare_contents", &self.compare_contents) .finish() } } @@ -56,14 +90,66 @@ fn emit_event(event_handler: &Mutex, res: Result) { } } +impl PathData { + pub fn collect( + path: &Path, + metadata: &Metadata, + build_hasher: Option<&BH>, + last_check: Instant, + ) -> Self { + let mtime = FileTime::from_last_modification_time(metadata).seconds(); + let hash = metadata + .is_file() + .then(|| build_hasher.and_then(|bh| Self::hash_file(path, bh).ok())) + .flatten(); + Self { + mtime, + hash, + last_check, + } + } + + fn hash_file, BH: BuildHasher>(path: P, build_hasher: &BH) -> io::Result { + let mut hasher = build_hasher.build_hasher(); + let mut file = fs::File::open(path)?; + let mut buf = [0; 512]; + loop { + let n = match file.read(&mut buf) { + Ok(0) => break, + Ok(len) => len, + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + }; + hasher.write(&buf[..n]); + } + Ok(hasher.finish()) + } + + pub fn detect_change(&self, other: &PathData) -> Option { + if self.mtime > other.mtime { + Some(EventKind::Modify(ModifyKind::Metadata( + MetadataKind::WriteTime, + ))) + } else if self.hash != other.hash { + Some(EventKind::Modify(ModifyKind::Data(DataChange::Any))) + } else { + None + } + } +} + impl PollWatcher { - /// Create a new [PollWatcher] and set the poll frequency to `delay`. - pub fn with_delay(event_handler: F, delay: Duration) -> Result { + /// Create a new [PollWatcher], configured as needed. + pub fn with_config( + event_handler: F, + config: PollWatcherConfig, + ) -> Result { let mut p = PollWatcher { event_handler: Arc::new(Mutex::new(event_handler)), watches: Arc::new(Mutex::new(HashMap::new())), open: Arc::new(AtomicBool::new(true)), - delay, + delay: config.poll_interval, + compare_contents: config.compare_contents, }; p.run(); Ok(p) @@ -73,6 +159,7 @@ impl PollWatcher { let watches = self.watches.clone(); let open = self.open.clone(); let delay = self.delay; + let build_hasher = self.compare_contents.then(RandomState::default); let event_handler = self.event_handler.clone(); let event_handler = move |res| emit_event(&event_handler, res); @@ -108,26 +195,20 @@ impl PollWatcher { } Ok(metadata) => { if !metadata.is_dir() { - let mtime = - FileTime::from_last_modification_time(&metadata) - .seconds(); - match paths.insert( - watch.clone(), - PathData { - mtime, - last_check: current_time, - }, - ) { + let path_data = PathData::collect( + watch, + &metadata, + build_hasher.as_ref(), + current_time, + ); + match paths.insert(watch.clone(), path_data.clone()) { None => { unreachable!(); } - Some(PathData { - mtime: old_mtime, .. - }) => { - if mtime > old_mtime { - let kind = MetadataKind::WriteTime; - let meta = ModifyKind::Metadata(kind); - let kind = EventKind::Modify(meta); + Some(old_path_data) => { + if let Some(kind) = + path_data.detect_change(&old_path_data) + { let ev = Event::new(kind).add_path(watch.clone()); event_handler(Ok(ev)); @@ -144,7 +225,6 @@ impl PollWatcher { .filter_map(|e| e.ok()) { let path = entry.path(); - match entry.metadata() { Err(e) => { let err = Error::io(e.into()) @@ -152,15 +232,15 @@ impl PollWatcher { event_handler(Err(err)); } Ok(m) => { - let mtime = - FileTime::from_last_modification_time(&m) - .seconds(); + let path_data = PathData::collect( + path, + &m, + build_hasher.as_ref(), + current_time, + ); match paths.insert( path.to_path_buf(), - PathData { - mtime, - last_check: current_time, - }, + path_data.clone(), ) { None => { let kind = @@ -169,14 +249,10 @@ impl PollWatcher { .add_path(path.to_path_buf()); event_handler(Ok(ev)); } - Some(PathData { - mtime: old_mtime, .. - }) => { - if mtime > old_mtime { - let kind = MetadataKind::WriteTime; - let meta = - ModifyKind::Metadata(kind); - let kind = EventKind::Modify(meta); + Some(old_path_data) => { + if let Some(kind) = path_data + .detect_change(&old_path_data) + { // TODO add new mtime as attr let ev = Event::new(kind) .add_path(path.to_path_buf()); @@ -214,6 +290,8 @@ impl PollWatcher { } fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { + let build_hasher = self.compare_contents.then(RandomState::default); + if let Ok(mut watches) = self.watches.lock() { let current_time = Instant::now(); @@ -228,14 +306,9 @@ impl PollWatcher { let mut paths = HashMap::new(); if !metadata.is_dir() { - let mtime = FileTime::from_last_modification_time(&metadata).seconds(); - paths.insert( - watch.clone(), - PathData { - mtime, - last_check: current_time, - }, - ); + let path_data = + PathData::collect(path, &metadata, build_hasher.as_ref(), current_time); + paths.insert(watch.clone(), path_data); } else { let depth = if recursive_mode.is_recursive() { usize::max_value() @@ -256,14 +329,13 @@ impl PollWatcher { emit_event(&self.event_handler, Err(err)); } Ok(m) => { - let mtime = FileTime::from_last_modification_time(&m).seconds(); - paths.insert( - path.to_path_buf(), - PathData { - mtime, - last_check: current_time, - }, + let path_data = PathData::collect( + path, + &m, + build_hasher.as_ref(), + current_time, ); + paths.insert(path.to_path_buf(), path_data); } } } @@ -297,8 +369,7 @@ impl Watcher for PollWatcher { /// The default poll frequency is 30 seconds. /// Use [with_delay] to manually set the poll frequency. fn new(event_handler: F) -> Result { - let delay = Duration::from_secs(30); - Self::with_delay(event_handler, delay) + Self::with_config(event_handler, PollWatcherConfig::default()) } fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { diff --git a/tests/poll-watcher-hashing.rs b/tests/poll-watcher-hashing.rs new file mode 100644 index 00000000..dfdf16fc --- /dev/null +++ b/tests/poll-watcher-hashing.rs @@ -0,0 +1,127 @@ +#![cfg(not(target_os = "windows"))] +use nix::sys::stat::futimens; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::os::unix::io::AsRawFd; +use std::path::{Path, PathBuf}; +use std::sync::mpsc::Receiver; +use std::time::{Duration, SystemTime}; +use std::{fs, sync, thread}; + +use nix::sys::time::TimeSpec; +use tempfile::TempDir; + +use notify::event::{CreateKind, DataChange, MetadataKind, ModifyKind}; +use notify::poll::PollWatcherConfig; +use notify::{Event, EventKind, PollWatcher, RecursiveMode, Watcher}; + +#[test] +fn test_poll_watcher_distinguish_modify_kind() { + let mut harness = TestHarness::setup(); + harness.watch_tempdir(); + + let testfile = harness.create_file("testfile"); + harness.expect_recv(&testfile, EventKind::Create(CreateKind::Any)); + harness.advance_clock(); + + harness.write_file(&testfile, "data1"); + harness.expect_recv( + &testfile, + EventKind::Modify(ModifyKind::Metadata(MetadataKind::WriteTime)), + ); + harness.advance_clock(); + + harness.write_file_keep_time(&testfile, "data2"); + harness.expect_recv( + &testfile, + EventKind::Modify(ModifyKind::Data(DataChange::Any)), + ); + harness.advance_clock(); + + harness.write_file(&testfile, "data2"); + harness.expect_recv( + &testfile, + EventKind::Modify(ModifyKind::Metadata(MetadataKind::WriteTime)), + ); +} + +struct TestHarness { + testdir: TempDir, + watcher: PollWatcher, + rx: Receiver>, +} + +impl TestHarness { + pub fn setup() -> Self { + let tempdir = tempfile::tempdir().unwrap(); + + let config = PollWatcherConfig { + compare_contents: true, + poll_interval: Duration::from_millis(10), + }; + let (tx, rx) = sync::mpsc::channel(); + let watcher = PollWatcher::with_config( + move |event: notify::Result| { + tx.send(event).unwrap(); + }, + config, + ) + .unwrap(); + + Self { + testdir: tempdir, + watcher, + rx, + } + } + + pub fn watch_tempdir(&mut self) { + self.watcher + .watch(self.testdir.path(), RecursiveMode::Recursive) + .unwrap(); + } + + pub fn create_file(&self, name: &str) -> PathBuf { + let path = self.testdir.path().join(name); + fs::File::create(&path).unwrap(); + path + } + + pub fn write_file>(&self, path: P, contents: &str) { + self.write_file_common(path.as_ref(), contents); + } + + pub fn write_file_keep_time>(&self, path: P, contents: &str) { + let metadata = fs::metadata(path.as_ref()).unwrap(); + let file = self.write_file_common(path.as_ref(), contents); + let atime = Self::to_timespec(metadata.accessed().unwrap()); + let mtime = Self::to_timespec(metadata.modified().unwrap()); + futimens(file.as_raw_fd(), &atime, &mtime).unwrap(); + } + + fn write_file_common(&self, path: &Path, contents: &str) -> File { + let mut file = OpenOptions::new().write(true).open(path).unwrap(); + file.write_all(contents.as_bytes()).unwrap(); + file + } + + fn to_timespec(t: SystemTime) -> TimeSpec { + TimeSpec::from_duration(t.duration_since(SystemTime::UNIX_EPOCH).unwrap()) + } + + pub fn advance_clock(&self) { + // Unfortunately this entire crate is pretty dependent on real syscall behaviour so let's + // test "for real" and require a sleep long enough to trigger mtime actually increasing. + thread::sleep(Duration::from_secs(1)); + } + + fn expect_recv>(&self, expected_path: P, expected_kind: EventKind) { + let actual = self + .rx + .recv_timeout(Duration::from_secs(15)) + .unwrap() + .expect("Watch I/O error not expected under test"); + assert_eq!(actual.paths, vec![expected_path.as_ref().to_path_buf()]); + assert_eq!(expected_kind, actual.kind); + } +}