Skip to content

Commit

Permalink
simplify deadlock avoidance algorithm a lot. run delay zero test on o…
Browse files Browse the repository at this point in the history
…ther platforms too
  • Loading branch information
mtak- authored and 0xpr03 committed Oct 16, 2019
1 parent 4cfdb66 commit 9a4437c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 61 deletions.
4 changes: 2 additions & 2 deletions src/debounce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

pub type OperationsBuffer =
Arc<Mutex<HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>>>;
pub type OperationsBufferInner = HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>;
pub type OperationsBuffer = Arc<Mutex<OperationsBufferInner>>;

pub enum EventTx {
Raw {
Expand Down
113 changes: 55 additions & 58 deletions src/debounce/timer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::super::{op, DebouncedEvent};

use std::collections::VecDeque;
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{
Expand All @@ -10,9 +11,7 @@ use std::sync::{
use std::thread;
use std::time::{Duration, Instant};

use debounce::OperationsBuffer;

struct TryLockError(ScheduledEvent);
use debounce::{OperationsBuffer, OperationsBufferInner};

#[derive(PartialEq, Eq)]
struct ScheduledEvent {
Expand All @@ -32,70 +31,68 @@ struct ScheduleWorker {

impl ScheduleWorker {
fn fire_due_events(&self, now: Instant) -> Option<Instant> {
'a: loop {
let mut events = self.events.lock().unwrap();
while let Some(event) = events.pop_front() {
if event.when <= now {
match self.fire_event(event) {
Ok(()) => {}
Err(TryLockError(event)) => {
events.push_front(event);
drop(events);
std::thread::yield_now();
continue 'a
}
}
} else {
// not due yet, put it back
let next_when = event.when;
events.push_front(event);
return Some(next_when);
// simple deadlock avoidance loop.
let (mut events, mut op_buf) = loop {
let events = self.events.lock().unwrap();

// To avoid deadlock, we do a `try_lock`, and on `WouldBlock`, we unlock the
// events Mutex, and retry after yielding.
match self.operations_buffer.try_lock() {
Ok(op_buf) => break (events, op_buf),
Err(std::sync::TryLockError::Poisoned {..}) => return None,
Err(std::sync::TryLockError::WouldBlock) => {
// drop the lock before yielding to give other threads a chance to complete
// their work.
drop(events);
std::thread::yield_now();
}
}
break
};
while let Some(event) = events.pop_front() {
if event.when <= now {
self.fire_event(event, &mut op_buf)
} else {
// not due yet, put it back
let next_when = event.when;
events.push_front(event);
return Some(next_when);
}
}
None
}

fn fire_event(&self, ev: ScheduledEvent) -> Result<(), TryLockError> {
match self.operations_buffer.try_lock() {
Ok(ref mut op_buf) => {
let ScheduledEvent { path, .. } = ev;
if let Some((op, from_path, _)) = op_buf.remove(&path) {
let is_partial_rename = from_path.is_none();
if let Some(from_path) = from_path {
self.tx
.send(DebouncedEvent::Rename(from_path, path.clone()))
.unwrap();
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
if path.exists() {
Some(DebouncedEvent::Create(path))
} else {
Some(DebouncedEvent::Remove(path))
}
}
_ => None,
};
if let Some(m) = message {
let _ = self.tx.send(m);
fn fire_event(
&self,
ev: ScheduledEvent,
op_buf: &mut impl DerefMut<Target = OperationsBufferInner>
) {
let ScheduledEvent { path, .. } = ev;
if let Some((op, from_path, _)) = op_buf.remove(&path) {
let is_partial_rename = from_path.is_none();
if let Some(from_path) = from_path {
self.tx
.send(DebouncedEvent::Rename(from_path, path.clone()))
.unwrap();
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
if path.exists() {
Some(DebouncedEvent::Create(path))
} else {
Some(DebouncedEvent::Remove(path))
}
} else {
// TODO error!("path not found in operations_buffer: {}", path.display())
}
Ok(())
}
Err(std::sync::TryLockError::Poisoned { .. }) => {
Ok(())
}
Err(std::sync::TryLockError::WouldBlock) => {
Err(TryLockError(ev))
_ => None,
};
if let Some(m) = message {
let _ = self.tx.send(m);
}
} else {
// TODO error!("path not found in operations_buffer: {}", path.display())
}
}

Expand Down
1 change: 0 additions & 1 deletion tests/debounce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1414,7 +1414,6 @@ fn one_file_many_events() {

// https://github.com/passcod/notify/issues/205
#[test]
#[cfg_attr(not(target_os = "macos"), ignore)]
fn delay_zero() {
let tdir = TempDir::new("temp_dir").expect("failed to create temporary directory");

Expand Down

0 comments on commit 9a4437c

Please sign in to comment.