Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downgrade Dashmap to 3.11.7 to fix store duplicates #287

Merged
merged 4 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ smallvec = "1.4.0"
pin-project = "0.4.16"
tokio = { version = "0.2.21", features = ["time"] }
snafu = { version = "0.6.8", features = ["futures"] }
dashmap = "4.0.0-rc6"
dashmap = "3.11.7"

[features]
default = ["native-tls"]
Expand All @@ -39,3 +39,4 @@ kube-derive = { path = "../kube-derive" }
#kube-derive = "0.37.0"
serde_json = "1.0.53"
tokio = { version = "0.2.21", features = ["full", "test-util"] }
rand = "0.7.3"
47 changes: 45 additions & 2 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ pub fn reflector<K: Meta + Clone, W: Stream<Item = watcher::Result<watcher::Even
mod tests {
use super::{reflector, store, ObjectRef};
use crate::watcher;
use futures::{stream, StreamExt};
use futures::{stream, StreamExt, TryStreamExt};
use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
use std::collections::BTreeMap;
use rand::{
distributions::{Bernoulli, Uniform},
Rng,
};
use std::collections::{BTreeMap, HashMap};

#[tokio::test]
async fn reflector_applied_should_add_object() {
Expand Down Expand Up @@ -140,4 +144,43 @@ mod tests {
assert_eq!(store.get(&ObjectRef::from_obj(&cm_a)), None);
assert_eq!(store.get(&ObjectRef::from_obj(&cm_b)), Some(cm_b));
}

#[tokio::test]
async fn reflector_store_should_not_contain_duplicates() {
let mut rng = rand::thread_rng();
let item_dist = Uniform::new(0u8, 100);
let deleted_dist = Bernoulli::new(0.40).unwrap();
let store_w = store::Writer::default();
let store = store_w.as_reader();
reflector(
store_w,
stream::iter((0u32..100000).map(|gen| {
let item = rng.sample(item_dist);
let deleted = rng.sample(deleted_dist);
let obj = ConfigMap {
metadata: ObjectMeta {
name: Some(item.to_string()),
resource_version: Some(gen.to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
Ok(if deleted {
watcher::Event::Deleted(obj)
} else {
watcher::Event::Applied(obj)
})
})),
)
.map_ok(|_| ())
.try_collect::<()>()
.await
.unwrap();

let mut seen_objects = HashMap::new();
for obj in store.state() {
assert_eq!(seen_objects.get(obj.metadata.name.as_ref().unwrap()), None);
seen_objects.insert(obj.metadata.name.clone().unwrap(), obj);
}
}
}
10 changes: 0 additions & 10 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,4 @@ impl<K: 'static + Clone + Resource> Store<K> {
pub fn state(&self) -> Vec<K> {
self.store.iter().map(|eg| eg.value().clone()).collect()
}

/// Return a guarded dashmap iterator of our state
///
/// This creates an iterator over all entries in the map.
/// This does not take a snapshot of the map and thus changes during the lifetime
/// of the iterator may or may not become visible in the iterator.
#[must_use]
pub fn iter(&self) -> dashmap::Iter<ObjectRef<K>, K> {
self.store.iter()
}
}
5 changes: 3 additions & 2 deletions kube/examples/configmap_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#[macro_use] extern crate log;
#[macro_use]
extern crate log;
use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
Expand All @@ -12,7 +13,7 @@ fn spawn_periodic_reader(reader: Store<ConfigMap>) {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let cms: Vec<_> = reader.iter().map(|eg| Meta::name(eg.value()).clone()).collect();
let cms: Vec<_> = reader.state().iter().map(|obj| Meta::name(obj).clone()).collect();
info!("Current configmaps: {:?}", cms);
}
});
Expand Down
25 changes: 25 additions & 0 deletions kube/examples/pod_reflector_live.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use color_eyre::Result;
use futures::prelude::*;
use k8s_openapi::api::core::v1::Pod;
use kube::{api::ListParams, Api, Client, Config};
use kube_runtime::{reflector, watcher};

#[tokio::main]
async fn main() -> Result<()> {
let config = Config::infer().await?;
let client = Client::new(config);
let namespace = std::env::var("NAMESPACE").unwrap_or("default".into());

let api: Api<Pod> = Api::namespaced(client, &namespace);
let store_w = reflector::store::Writer::default();
let store = store_w.as_reader();
let reflector = reflector(store_w, watcher(api, ListParams::default()));
// Use try_for_each to fail on first error, use for_each to keep retrying
reflector
.try_for_each(|_event| async {
println!("Current pod count: {}", store.state().len());
Ok(())
})
.await?;
Ok(())
}