From 59efd8de8087edc206c8a0cad9249fe87d9d271a Mon Sep 17 00:00:00 2001 From: clux Date: Thu, 23 May 2024 11:53:32 +0100 Subject: [PATCH] fmt Signed-off-by: clux --- kube-runtime/src/reflector/mod.rs | 3 +- kube-runtime/src/watcher.rs | 89 ++++++++++++------------------- 2 files changed, 34 insertions(+), 58 deletions(-) diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 34f19d36c..ff39dd83d 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -12,8 +12,7 @@ use crate::watcher; use async_stream::stream; use futures::{Stream, StreamExt}; use std::hash::Hash; -#[cfg(feature = "unstable-runtime-subscribe")] -pub use store::store_shared; +#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared; pub use store::{store, Store}; /// Cache objects from a [`watcher()`] stream into a local [`Store`] diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index c4a02e5a2..833c2f733 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -496,19 +496,15 @@ where match api.list(&lp).await { Ok(list) => { if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) { - ( - Some(Ok(Event::InitPage(list.items))), - State::InitPage { - continue_token: Some(continue_token), - }, - ) + (Some(Ok(Event::InitPage(list.items))), State::InitPage { + continue_token: Some(continue_token), + }) } else if let Some(resource_version) = list.metadata.resource_version.filter(|s| !s.is_empty()) { - ( - Some(Ok(Event::InitPage(list.items))), - State::InitPageDone { resource_version }, - ) + (Some(Ok(Event::InitPage(list.items))), State::InitPageDone { + resource_version, + }) } else { (Some(Err(Error::NoResourceVersion)), State::Empty) } @@ -540,13 +536,10 @@ where Some(Ok(WatchEvent::Bookmark(bm))) => { let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end"); if marks_initial_end { - ( - Some(Ok(Event::Ready)), - State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }, - ) + (Some(Ok(Event::Ready)), State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }) } else { (None, State::InitialWatch { stream }) } @@ -578,23 +571,19 @@ where } State::InitListed { resource_version } => { match api.watch(&wc.to_watch_params(), &resource_version).await { - Ok(stream) => ( - None, - State::Watching { - resource_version, - stream, - }, - ), + Ok(stream) => (None, State::Watching { + resource_version, + stream, + }), Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { warn!("watch initlist error with 403: {err:?}"); } else { debug!("watch initlist error: {err:?}"); } - ( - Some(Err(Error::WatchStartFailed(err))), - State::InitListed { resource_version }, - ) + (Some(Err(Error::WatchStartFailed(err))), State::InitListed { + resource_version, + }) } } } @@ -607,13 +596,10 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - ( - Some(Ok(Event::Apply(obj))), - State::Watching { - resource_version, - stream, - }, - ) + (Some(Ok(Event::Apply(obj))), State::Watching { + resource_version, + stream, + }) } } Some(Ok(WatchEvent::Deleted(obj))) => { @@ -621,22 +607,16 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - ( - Some(Ok(Event::Delete(obj))), - State::Watching { - resource_version, - stream, - }, - ) + (Some(Ok(Event::Delete(obj))), State::Watching { + resource_version, + stream, + }) } } - Some(Ok(WatchEvent::Bookmark(bm))) => ( - None, - State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }, - ), + Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }), Some(Ok(WatchEvent::Error(err))) => { // HTTP GONE, means we have desynced and need to start over and re-list :( let new_state = if err.code == 410 { @@ -660,13 +640,10 @@ where } else { debug!("watcher error: {err:?}"); } - ( - Some(Err(Error::WatchFailed(err))), - State::Watching { - resource_version, - stream, - }, - ) + (Some(Err(Error::WatchFailed(err))), State::Watching { + resource_version, + stream, + }) } None => (None, State::InitListed { resource_version }), },