Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong NoWatcher error code and possible session disconnected in watcher remove #24

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/proto/error_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub enum ErrorCode {
AuthFailed = -115,
SessionMoved = -118,
NotReadOnly = -119,
NoWatcher = -122,
NoWatcher = -121,
ReconfigDisabled = -123,
SessionClosedRequireSaslAuth = -124,
QuotaExceeded = -125,
Expand Down
14 changes: 9 additions & 5 deletions src/session/depot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ impl Depot {
}
}

fn push_request(&mut self, mut operation: SessionOperation) {
operation.request.set_xid(self.xid.next());
self.push_operation(Operation::Session(operation));
}

pub fn pop_ping(&mut self) -> Result<(), Error> {
self.pop_request(PredefinedXid::Ping.into()).map(|_| ())
}
Expand Down Expand Up @@ -145,13 +150,13 @@ impl Depot {
if *count == 0 {
self.watching_paths.remove(&(path, mode));
if let Some(operation) = self.unwatching_paths.remove(&(path, mode)) {
self.push_operation(Operation::Session(operation));
self.push_request(operation);
}
if self.has_watching_requests(path) {
return;
}
if let Some(operation) = self.unwatching_paths.remove(&(path, WatchMode::Any)) {
self.push_operation(Operation::Session(operation));
self.push_request(operation);
}
}
}
Expand All @@ -166,7 +171,7 @@ impl Depot {
self.cancel_unwatch(path, mode);
}

pub fn push_session(&mut self, mut operation: SessionOperation) {
pub fn push_session(&mut self, operation: SessionOperation) {
let info = operation.request.get_operation_info();
log::debug!("ZooKeeper operation request: {:?}", info);
if let (op_code, OpStat::Watch { path, mode }) = info {
Expand All @@ -185,8 +190,7 @@ impl Depot {
self.watching_paths.insert((path, mode), count);
}
}
operation.request.set_xid(self.xid.next());
self.push_operation(Operation::Session(operation));
self.push_request(operation);
}

pub fn push_remove_watch(&mut self, path: &str, mode: WatchMode, responser: StateResponser) {
Expand Down
2 changes: 1 addition & 1 deletion src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl Session {
) {
if let Err(err) = self.serve_session(&sock, buf, depot, requester, unwatch_requester).await {
self.resolve_serve_error(&err);
log::debug!("ZooKeeper session {} state {} error {}", self.session_id, self.session_state, err);
log::info!("ZooKeeper session {} state {} error {}", self.session_id, self.session_state, err);
depot.error(&err);
} else {
self.change_state(SessionState::Disconnected);
Expand Down
57 changes: 57 additions & 0 deletions tests/zookeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,35 @@ async fn test_persistent_watcher_passive_remove() {
assert_eq!(child_event.path, "/");
}

#[test_log::test(tokio::test)]
async fn test_fail_watch_with_multiple_unwatching() {
let docker = DockerCli::default();
let zookeeper = docker.run(zookeeper_image());
let zk_port = zookeeper.get_host_port(2181);
let cluster = format!("127.0.0.1:{}", zk_port);

let client = zk::Client::connect(&cluster).await.unwrap();

let (_, exist_watcher1) = client.check_and_watch_stat("/a1").await.unwrap();
let (_, exist_watcher2) = client.check_and_watch_stat("/a2").await.unwrap();

let mut state_watcher = client.state_watcher();

let data_watching1 = client.get_and_watch_data("/a1");
let data_watching2 = client.get_and_watch_data("/a2");

drop(exist_watcher1);
drop(exist_watcher2);

assert_that!(data_watching1.await.unwrap_err()).is_equal_to(zk::Error::NoNode);
assert_that!(data_watching2.await.unwrap_err()).is_equal_to(zk::Error::NoNode);

select! {
state = state_watcher.changed() => panic!("expect no state update, but got {state}"),
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
}
}

#[test_log::test(tokio::test)]
async fn test_fail_watch_with_concurrent_passive_remove() {
let docker = DockerCli::default();
Expand Down Expand Up @@ -1375,6 +1404,34 @@ async fn test_watcher_coexist_on_same_path() {
assert_that!(recursive_watcher.changed().await).is_equal_to(&expected);
}

// Use "current_thread" explicitly.
#[test_log::test(tokio::test(flavor = "current_thread"))]
async fn test_remove_no_watcher() {
let docker = DockerCli::default();
let zookeeper = docker.run(zookeeper_image());
let zk_port = zookeeper.get_host_port(2181);
let cluster = format!("127.0.0.1:{}", zk_port);

let client = zk::Client::connect(&cluster).await.unwrap();

let (_, exist_watcher) = client.check_and_watch_stat("/a").await.unwrap();
let create = client.create("/a", &vec![], PERSISTENT_OPEN);

// Let session task issue `create` request first, oneshot watch will be removed by server.
tokio::task::yield_now().await;

// Issue `RemoveWatches` which likely happen before watch event notification as it involves
// several IO paths.
assert_that!(exist_watcher.remove().await.unwrap_err()).is_equal_to(zk::Error::NoWatcher);
create.await.unwrap();

let (_, _, data_watcher) = client.get_and_watch_data("/a").await.unwrap();
let delete = client.delete("/a", None);
tokio::task::yield_now().await;
assert_that!(data_watcher.remove().await.unwrap_err()).is_equal_to(zk::Error::NoWatcher);
delete.await.unwrap();
}

#[test_log::test(tokio::test)]
async fn test_session_event() {
let docker = DockerCli::default();
Expand Down
Loading