Skip to content

Commit d20c161

Browse files
committed
fix: unwatching revived by failed watch could remove ongoing watching
1 parent 04385bf commit d20c161

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

src/session/depot.rs

+6
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ impl Depot {
133133
if let Some(SessionOperation { responser, .. }) = self.unwatching_paths.remove(&(path, mode)) {
134134
responser.send_empty();
135135
}
136+
if let Some(SessionOperation { responser, .. }) = self.unwatching_paths.remove(&(path, WatchMode::Any)) {
137+
responser.send_empty();
138+
}
136139
}
137140

138141
pub fn fail_watch(&mut self, path: &str, mode: WatchMode) {
@@ -144,6 +147,9 @@ impl Depot {
144147
if let Some(operation) = self.unwatching_paths.remove(&(path, mode)) {
145148
self.push_operation(Operation::Session(operation));
146149
}
150+
if self.has_watching_requests(path) {
151+
return;
152+
}
147153
if let Some(operation) = self.unwatching_paths.remove(&(path, WatchMode::Any)) {
148154
self.push_operation(Operation::Session(operation));
149155
}

tests/zookeeper.rs

+24
Original file line numberDiff line numberDiff line change
@@ -1062,6 +1062,30 @@ async fn test_persistent_watcher_passive_remove() {
10621062
assert_eq!(child_event.path, "/");
10631063
}
10641064

1065+
#[test_log::test(tokio::test)]
1066+
async fn test_fail_watch_with_concurrent_passive_remove() {
1067+
let docker = DockerCli::default();
1068+
let zookeeper = docker.run(zookeeper_image());
1069+
let zk_port = zookeeper.get_host_port(2181);
1070+
let cluster = format!("127.0.0.1:{}", zk_port);
1071+
1072+
let client = zk::Client::connect(&cluster).await.unwrap();
1073+
1074+
let recursive_watcher = client.watch("/a", zk::AddWatchMode::PersistentRecursive).await.unwrap();
1075+
let data_watching = client.get_and_watch_data("/a");
1076+
let persistent_watching = client.watch("/a", zk::AddWatchMode::Persistent);
1077+
drop(recursive_watcher);
1078+
1079+
assert_that!(data_watching.await.unwrap_err()).is_equal_to(zk::Error::NoNode);
1080+
let mut persistent_watcher = persistent_watching.await.unwrap();
1081+
1082+
client.create("/a", b"a", PERSISTENT_OPEN).await.unwrap();
1083+
1084+
let event = persistent_watcher.changed().await;
1085+
assert_that!(event.event_type).is_equal_to(zk::EventType::NodeCreated);
1086+
assert_that!(event.path).is_same_string_to("/a");
1087+
}
1088+
10651089
#[test_log::test(tokio::test)]
10661090
async fn test_persistent_watcher() {
10671091
let docker = DockerCli::default();

0 commit comments

Comments
 (0)