From b249791729ba126e73da7127a7260d4e2a9714f0 Mon Sep 17 00:00:00 2001 From: shouya <526598+shouya@users.noreply.github.com> Date: Thu, 14 Mar 2024 23:03:34 +0900 Subject: [PATCH] feat: reliably detect config file modification (#84) As reported in #80, the fs watcher previously cannot detect config changes made using vim. Vim's default save file behavior (controlled by `backupcopy` option) is to save the file under a new name and then rename it to override the original file. As such, the inode of the previously watched file is gone and will no longer triggers inotify events. This PR addresses the issue by additionally detecting file remove events, and re-launch the watcher on the same path when the file is found deleted. In case the config file is truly deleted instead of being overridden, the watcher will wait indefinitely until the file has been recreated. Fixes #80. --- src/server.rs | 75 ++++-------------------- src/server/watcher.rs | 133 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 65 deletions(-) create mode 100644 src/server/watcher.rs diff --git a/src/server.rs b/src/server.rs index e2ca651..b717012 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,23 +3,20 @@ mod endpoint; mod feed_service; #[cfg(feature = "inspector-ui")] mod inspector; +mod watcher; use std::{path::Path, sync::Arc}; use axum::{routing::get, Extension, Router}; use clap::Parser; use http::StatusCode; -use tokio::sync::mpsc; use tower_http::compression::CompressionLayer; -use tracing::{error, info, warn}; +use tracing::{info, warn}; -use crate::{ - cli::RootConfig, - util::{ConfigError, Result}, -}; +use crate::{cli::RootConfig, util::Result}; pub use endpoint::{EndpointConfig, EndpointParam}; -use self::feed_service::FeedService; +use self::{feed_service::FeedService, watcher::Watcher}; #[derive(Parser, Clone)] pub struct ServerConfig { @@ -72,13 +69,17 @@ impl ServerConfig { let feed_service = FeedService::try_from(config).await?; // watcher must not be dropped until the end of the function - let (_watcher, mut config_update) = fs_watcher(config_path)?; + let mut watcher = Watcher::new(config_path)?; + let mut change_alert = + watcher.take_change_alert().expect("change alert taken"); + + tokio::task::spawn(watcher.run()); // signal for reload on config update let feed_service_clone = feed_service.clone(); let config_path_clone = config_path.to_owned(); tokio::task::spawn(async move { - while config_update.recv().await.is_some() { + while change_alert.recv().await.is_some() { info!("config updated, reloading service"); if !feed_service_clone.reload(&config_path_clone).await { feed_service_clone @@ -125,59 +126,3 @@ impl ServerConfig { Ok(server.await?) } } - -fn fs_watcher( - config_path: &Path, -) -> Result<(notify::RecommendedWatcher, mpsc::Receiver<()>)> { - use notify::{Event, RecursiveMode, Watcher}; - let (tx, rx) = mpsc::channel(1); - - let event_handler = move |event: Result| match event { - Ok(event) if event.kind.is_modify() => { - tx.blocking_send(()).unwrap(); - } - Ok(_) => {} - Err(_) => { - error!("file watcher error: {:?}", event); - } - }; - - let mut watcher = - notify::recommended_watcher(event_handler).map_err(|e| { - ConfigError::Message(format!("failed to create file watcher: {:?}", e)) - })?; - - watcher - .watch(config_path, RecursiveMode::NonRecursive) - .map_err(|e| { - ConfigError::Message(format!("failed to watch file: {:?}", e)) - })?; - - // sometimes the editor may touch the file multiple times in quick - // succession when saving, so we debounce the events - let rx = debounce(std::time::Duration::from_millis(500), rx); - Ok((watcher, rx)) -} - -fn debounce( - duration: std::time::Duration, - mut rx: mpsc::Receiver, -) -> mpsc::Receiver { - let (debounced_tx, debounced_rx) = mpsc::channel(1); - tokio::task::spawn(async move { - let mut last = None; - loop { - tokio::select! { - val = rx.recv() => { - last = val; - } - _ = tokio::time::sleep(duration) => { - if let Some(val) = last.take() { - debounced_tx.send(val).await.unwrap(); - } - } - } - } - }); - debounced_rx -} diff --git a/src/server/watcher.rs b/src/server/watcher.rs new file mode 100644 index 0000000..a3f8bd7 --- /dev/null +++ b/src/server/watcher.rs @@ -0,0 +1,133 @@ +use std::{ + path::{Path, PathBuf}, + time::Duration, +}; + +use tokio::{ + sync::mpsc::{self, Receiver, Sender}, + time::sleep, +}; +use tracing::{error, warn}; + +use crate::util::{ConfigError, Result}; + +pub struct Watcher { + path: PathBuf, + watcher: Option, + rx: Option>, + tx: Sender<()>, + reload_tx: Sender<()>, + reload_rx: Receiver<()>, +} + +impl Watcher { + pub fn new(path: &Path) -> Result { + let (tx, rx) = mpsc::channel(1); + let (reload_tx, reload_rx) = mpsc::channel(1); + + // sometimes the editor may touch the file multiple times in quick + // succession when saving, so we debounce the events + let rx = debounce(Duration::from_millis(500), rx); + + Ok(Self { + path: path.to_owned(), + watcher: None, + reload_tx, + reload_rx, + rx: Some(rx), + tx, + }) + } + + pub fn take_change_alert(&mut self) -> Option> { + self.rx.take() + } + + pub async fn run(mut self) -> Result<()> { + self.setup().await?; + + loop { + self.reload_rx.recv().await.unwrap(); + self.setup().await?; + // the file is re-created, trigger a reload + self.tx.send(()).await.unwrap(); + } + } + + async fn setup(&mut self) -> Result<()> { + use notify::{ + event::{ModifyKind, RemoveKind}, + Event, EventKind, RecursiveMode, Watcher, + }; + + let tx = self.tx.clone(); + let reload_tx = self.reload_tx.clone(); + let event_handler = move |event: Result| match event { + Ok(Event { + kind: EventKind::Modify(ModifyKind::Data(_)), + .. + }) => { + tx.blocking_send(()).unwrap(); + } + Ok(Event { + kind: EventKind::Remove(RemoveKind::File), + .. + }) => { + // Captures vim's backupcopy=yes behavior. The file is likely + // renamed and deleted, try monitor the same file name again. + reload_tx.blocking_send(()).unwrap(); + } + Ok(_event) => {} + Err(_) => { + error!("file watcher error: {:?}", event); + } + }; + + let mut watcher = + notify::recommended_watcher(event_handler).map_err(|e| { + ConfigError::Message(format!("failed to create file watcher: {:?}", e)) + })?; + + // if the file does not exist, simply wait for it to be created + while !self.path.exists() { + warn!( + "{} does not exist, waiting for it to be created", + self.path.display() + ); + sleep(Duration::from_secs(10)).await; + } + + watcher + .watch(&self.path, RecursiveMode::NonRecursive) + .map_err(|e| { + ConfigError::Message(format!("failed to watch file: {:?}", e)) + })?; + + self.watcher.replace(watcher); + + Ok(()) + } +} + +fn debounce( + duration: Duration, + mut rx: Receiver, +) -> Receiver { + let (debounced_tx, debounced_rx) = mpsc::channel(1); + tokio::task::spawn(async move { + let mut last = None; + loop { + tokio::select! { + val = rx.recv() => { + last = val; + } + _ = sleep(duration) => { + if let Some(val) = last.take() { + debounced_tx.send(val).await.unwrap(); + } + } + } + } + }); + debounced_rx +}