Skip to content

Commit

Permalink
feat: reliably detect config file modification (#84)
Browse files Browse the repository at this point in the history
As reported in #80, the fs watcher previously cannot detect config
changes made using vim.

Vim's default save file behavior (controlled by `backupcopy` option) is
to save the file under a new name and then rename it to override the
original file. As such, the inode of the previously watched file is gone
and will no longer triggers inotify events.

This PR addresses the issue by additionally detecting file remove
events, and re-launch the watcher on the same path when the file is
found deleted.

In case the config file is truly deleted instead of being overridden,
the watcher will wait indefinitely until the file has been recreated.

Fixes #80.
  • Loading branch information
shouya authored Mar 14, 2024
1 parent cc1a81b commit b249791
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 65 deletions.
75 changes: 10 additions & 65 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@ mod endpoint;
mod feed_service;
#[cfg(feature = "inspector-ui")]
mod inspector;
mod watcher;

use std::{path::Path, sync::Arc};

use axum::{routing::get, Extension, Router};
use clap::Parser;
use http::StatusCode;
use tokio::sync::mpsc;
use tower_http::compression::CompressionLayer;
use tracing::{error, info, warn};
use tracing::{info, warn};

use crate::{
cli::RootConfig,
util::{ConfigError, Result},
};
use crate::{cli::RootConfig, util::Result};
pub use endpoint::{EndpointConfig, EndpointParam};

use self::feed_service::FeedService;
use self::{feed_service::FeedService, watcher::Watcher};

#[derive(Parser, Clone)]
pub struct ServerConfig {
Expand Down Expand Up @@ -72,13 +69,17 @@ impl ServerConfig {
let feed_service = FeedService::try_from(config).await?;

// watcher must not be dropped until the end of the function
let (_watcher, mut config_update) = fs_watcher(config_path)?;
let mut watcher = Watcher::new(config_path)?;
let mut change_alert =
watcher.take_change_alert().expect("change alert taken");

tokio::task::spawn(watcher.run());

// signal for reload on config update
let feed_service_clone = feed_service.clone();
let config_path_clone = config_path.to_owned();
tokio::task::spawn(async move {
while config_update.recv().await.is_some() {
while change_alert.recv().await.is_some() {
info!("config updated, reloading service");
if !feed_service_clone.reload(&config_path_clone).await {
feed_service_clone
Expand Down Expand Up @@ -125,59 +126,3 @@ impl ServerConfig {
Ok(server.await?)
}
}

fn fs_watcher(
config_path: &Path,
) -> Result<(notify::RecommendedWatcher, mpsc::Receiver<()>)> {
use notify::{Event, RecursiveMode, Watcher};
let (tx, rx) = mpsc::channel(1);

let event_handler = move |event: Result<Event, notify::Error>| match event {
Ok(event) if event.kind.is_modify() => {
tx.blocking_send(()).unwrap();
}
Ok(_) => {}
Err(_) => {
error!("file watcher error: {:?}", event);
}
};

let mut watcher =
notify::recommended_watcher(event_handler).map_err(|e| {
ConfigError::Message(format!("failed to create file watcher: {:?}", e))
})?;

watcher
.watch(config_path, RecursiveMode::NonRecursive)
.map_err(|e| {
ConfigError::Message(format!("failed to watch file: {:?}", e))
})?;

// sometimes the editor may touch the file multiple times in quick
// succession when saving, so we debounce the events
let rx = debounce(std::time::Duration::from_millis(500), rx);
Ok((watcher, rx))
}

fn debounce<T: Send + 'static>(
duration: std::time::Duration,
mut rx: mpsc::Receiver<T>,
) -> mpsc::Receiver<T> {
let (debounced_tx, debounced_rx) = mpsc::channel(1);
tokio::task::spawn(async move {
let mut last = None;
loop {
tokio::select! {
val = rx.recv() => {
last = val;
}
_ = tokio::time::sleep(duration) => {
if let Some(val) = last.take() {
debounced_tx.send(val).await.unwrap();
}
}
}
}
});
debounced_rx
}
133 changes: 133 additions & 0 deletions src/server/watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use std::{
path::{Path, PathBuf},
time::Duration,
};

use tokio::{
sync::mpsc::{self, Receiver, Sender},
time::sleep,
};
use tracing::{error, warn};

use crate::util::{ConfigError, Result};

pub struct Watcher {
path: PathBuf,
watcher: Option<notify::RecommendedWatcher>,
rx: Option<Receiver<()>>,
tx: Sender<()>,
reload_tx: Sender<()>,
reload_rx: Receiver<()>,
}

impl Watcher {
pub fn new(path: &Path) -> Result<Self> {
let (tx, rx) = mpsc::channel(1);
let (reload_tx, reload_rx) = mpsc::channel(1);

// sometimes the editor may touch the file multiple times in quick
// succession when saving, so we debounce the events
let rx = debounce(Duration::from_millis(500), rx);

Ok(Self {
path: path.to_owned(),
watcher: None,
reload_tx,
reload_rx,
rx: Some(rx),
tx,
})
}

pub fn take_change_alert(&mut self) -> Option<Receiver<()>> {
self.rx.take()
}

pub async fn run(mut self) -> Result<()> {
self.setup().await?;

loop {
self.reload_rx.recv().await.unwrap();
self.setup().await?;
// the file is re-created, trigger a reload
self.tx.send(()).await.unwrap();
}
}

async fn setup(&mut self) -> Result<()> {
use notify::{
event::{ModifyKind, RemoveKind},
Event, EventKind, RecursiveMode, Watcher,
};

let tx = self.tx.clone();
let reload_tx = self.reload_tx.clone();
let event_handler = move |event: Result<Event, notify::Error>| match event {
Ok(Event {
kind: EventKind::Modify(ModifyKind::Data(_)),
..
}) => {
tx.blocking_send(()).unwrap();
}
Ok(Event {
kind: EventKind::Remove(RemoveKind::File),
..
}) => {
// Captures vim's backupcopy=yes behavior. The file is likely
// renamed and deleted, try monitor the same file name again.
reload_tx.blocking_send(()).unwrap();
}
Ok(_event) => {}
Err(_) => {
error!("file watcher error: {:?}", event);
}
};

let mut watcher =
notify::recommended_watcher(event_handler).map_err(|e| {
ConfigError::Message(format!("failed to create file watcher: {:?}", e))
})?;

// if the file does not exist, simply wait for it to be created
while !self.path.exists() {
warn!(
"{} does not exist, waiting for it to be created",
self.path.display()
);
sleep(Duration::from_secs(10)).await;
}

watcher
.watch(&self.path, RecursiveMode::NonRecursive)
.map_err(|e| {
ConfigError::Message(format!("failed to watch file: {:?}", e))
})?;

self.watcher.replace(watcher);

Ok(())
}
}

fn debounce<T: Send + 'static>(
duration: Duration,
mut rx: Receiver<T>,
) -> Receiver<T> {
let (debounced_tx, debounced_rx) = mpsc::channel(1);
tokio::task::spawn(async move {
let mut last = None;
loop {
tokio::select! {
val = rx.recv() => {
last = val;
}
_ = sleep(duration) => {
if let Some(val) = last.take() {
debounced_tx.send(val).await.unwrap();
}
}
}
}
});
debounced_rx
}

0 comments on commit b249791

Please sign in to comment.