Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return a Stream<Item=Result<T>> from request_events #92

Merged
merged 25 commits into from
Nov 29, 2019
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
db495fd
Converted events to a stream
kitmoog Nov 22, 2019
e56a7d1
Added warn log on error
kitmoog Nov 22, 2019
5892d01
Merge branch 'master' into event-streams
kitmoog Nov 22, 2019
9c5983b
Stream in the informer
kitmoog Nov 22, 2019
7bb2f9c
Convert informer::poll to return a Stream
kitmoog Nov 22, 2019
7c215e5
Avoid `collect` in informer::single_watch and just handle stream
kitmoog Nov 22, 2019
2a5174e
Updated examples to short-circuit
kitmoog Nov 22, 2019
1b6d853
Fixed `unwrap_or` type
kitmoog Nov 22, 2019
afce6a5
SLightly tidied examples
kitmoog Nov 23, 2019
4e0e4b9
First draft of double buffering the stream
kitmoog Nov 23, 2019
26b3e43
Removed commented out code
kitmoog Nov 23, 2019
cf0c029
Comment typo
kitmoog Nov 23, 2019
5712a12
Attempt to parse events on newline
kitmoog Nov 23, 2019
8b70b03
Added missing comma to match arm
kitmoog Nov 23, 2019
f650451
Better parse condition with valid JSON
kitmoog Nov 25, 2019
dfdd61f
Handle partial updates from server in request_events
kitmoog Nov 25, 2019
2fa84a3
Commented unfold method more thoroughly
kitmoog Nov 25, 2019
cf31d96
Removed Send requirement and used flatten
kitmoog Nov 25, 2019
1631060
Remove unused event queue
kitmoog Nov 25, 2019
b1bd334
Handle partial events in same loop
kitmoog Nov 25, 2019
88ca6ec
Cancel stream on desync error
kitmoog Nov 28, 2019
a5e7e0a
Had the error boolean backwards in the take_while
kitmoog Nov 28, 2019
500950f
Simplified resync case without custom error
kitmoog Nov 29, 2019
6a5988f
rustfmt'd
kitmoog Nov 29, 2019
26ec8a0
Correctly handle 410 watch error
kitmoog Nov 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 32 additions & 36 deletions src/api/informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::api::{Api, ListParams, RawApi, Void};
use crate::client::APIClient;
use crate::Result;

use futures::StreamExt;
use futures::{Stream, StreamExt};
use futures_timer::Delay;
use serde::de::DeserializeOwned;
use std::{
Expand Down Expand Up @@ -128,11 +128,33 @@ where
/// If handling all the events is too time consuming, you probably need a queue.
pub async fn poll(&self) -> Result<()> {
trace!("Watching {:?}", self.resource);
match self.single_watch().await {
Ok((events, newver)) => {
*self.version.write().unwrap() = newver;
for e in events {
self.events.write().unwrap().push_back(e);
let events = self.single_watch().await.map(|stream| stream.boxed());
clux marked this conversation as resolved.
Show resolved Hide resolved

match events {
Ok(mut events) => {
let mut new_version = None;

// Stream over each event, updating new_version if necessary
// and enqueuing the event
while let Some(Ok(event)) = events.next().await {
// check if we should consider the version
if let WatchEvent::Added(o) = &event {
new_version = o.meta().resourceVersion.clone();
} else if let WatchEvent::Modified(o) = &event {
new_version = o.meta().resourceVersion.clone();
} else if let WatchEvent::Deleted(o) = &event {
new_version = o.meta().resourceVersion.clone();
}

// And enqueue it
self.events.write().unwrap().push_back(event);
}

// Update our version need be
// Follow docs conventions and store the last resourceVersion
// https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
if let Some(version) = new_version {
*self.version.write().unwrap() = version;
}
}
kitfre marked this conversation as resolved.
Show resolved Hide resolved
Err(e) => {
Expand All @@ -142,7 +164,8 @@ where
Delay::new(dur).await;
self.reset().await?;
}
};
}

Ok(())
}

Expand Down Expand Up @@ -181,36 +204,9 @@ where
}

/// Watch helper
async fn single_watch(&self) -> Result<(Vec<WatchEvent<K>>, String)> {
async fn single_watch(&self) -> Result<impl Stream<Item = Result<WatchEvent<K>>>> {
let oldver = self.version();
let req = self.resource.watch(&self.params, &oldver)?;
let events = self
.client
.request_events::<WatchEvent<K>>(req)
.await?
.filter_map(|e| async move { e.ok() })
.collect::<Vec<_>>()
.await;

// Follow docs conventions and store the last resourceVersion
// https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
let newver = events
.iter()
.filter_map(|e| match e {
WatchEvent::Added(o) => o.meta().resourceVersion.clone(),
WatchEvent::Modified(o) => o.meta().resourceVersion.clone(),
WatchEvent::Deleted(o) => o.meta().resourceVersion.clone(),
_ => None,
})
.last()
.unwrap_or(oldver);
debug!(
"Got {} {} events, resourceVersion={}",
events.len(),
self.resource.resource,
newver
);

Ok((events, newver))
clux marked this conversation as resolved.
Show resolved Hide resolved
Ok(self.client.request_events::<WatchEvent<K>>(req).await?)
}
}