Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return a Stream<Item=Result<T>> from request_events #92

Merged
merged 25 commits into from
Nov 29, 2019
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
db495fd
Converted events to a stream
kitmoog Nov 22, 2019
e56a7d1
Added warn log on error
kitmoog Nov 22, 2019
5892d01
Merge branch 'master' into event-streams
kitmoog Nov 22, 2019
9c5983b
Stream in the informer
kitmoog Nov 22, 2019
7bb2f9c
Convert informer::poll to return a Stream
kitmoog Nov 22, 2019
7c215e5
Avoid `collect` in informer::single_watch and just handle stream
kitmoog Nov 22, 2019
2a5174e
Updated examples to short-circuit
kitmoog Nov 22, 2019
1b6d853
Fixed `unwrap_or` type
kitmoog Nov 22, 2019
afce6a5
SLightly tidied examples
kitmoog Nov 23, 2019
4e0e4b9
First draft of double buffering the stream
kitmoog Nov 23, 2019
26b3e43
Removed commented out code
kitmoog Nov 23, 2019
cf0c029
Comment typo
kitmoog Nov 23, 2019
5712a12
Attempt to parse events on newline
kitmoog Nov 23, 2019
8b70b03
Added missing comma to match arm
kitmoog Nov 23, 2019
f650451
Better parse condition with valid JSON
kitmoog Nov 25, 2019
dfdd61f
Handle partial updates from server in request_events
kitmoog Nov 25, 2019
2fa84a3
Commented unfold method more thoroughly
kitmoog Nov 25, 2019
cf31d96
Removed Send requirement and used flatten
kitmoog Nov 25, 2019
1631060
Remove unused event queue
kitmoog Nov 25, 2019
b1bd334
Handle partial events in same loop
kitmoog Nov 25, 2019
88ca6ec
Cancel stream on desync error
kitmoog Nov 28, 2019
a5e7e0a
Had the error boolean backwards in the take_while
kitmoog Nov 28, 2019
500950f
Simplified resync case without custom error
kitmoog Nov 29, 2019
6a5988f
rustfmt'd
kitmoog Nov 29, 2019
26ec8a0
Correctly handle 410 watch error
kitmoog Nov 29, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ time = "0.1.42"
either = "1.5.3"
thiserror = "1.0.5"
futures-timer = "2.0.0"
futures = "0.3.1"

[features]
default = []
Expand Down
21 changes: 12 additions & 9 deletions examples/event_informer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#[macro_use] extern crate log;
#[macro_use]
extern crate log;
use kube::{
api::{Api, Informer, WatchEvent},
api::v1Event,
api::{Api, Informer, WatchEvent},
client::APIClient,
config,
};

use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=trace");
Expand All @@ -14,13 +17,13 @@ async fn main() -> anyhow::Result<()> {
let client = APIClient::new(config);

let events = Api::v1Event(client);
let ei = Informer::new(events)
.init().await?;
let ei = Informer::new(events).init().await?;

loop {
ei.poll().await?;
let mut events = ei.poll().await?.boxed();

while let Some(event) = ei.pop() {
while let Some(event) = events.next().await {
let event = event?;
handle_events(event)?;
}
}
Expand All @@ -31,13 +34,13 @@ fn handle_events(ev: WatchEvent<v1Event>) -> anyhow::Result<()> {
match ev {
WatchEvent::Added(o) => {
info!("New Event: {}, {}", o.type_, o.message);
},
}
WatchEvent::Modified(o) => {
info!("Modified Event: {}", o.reason);
},
}
WatchEvent::Deleted(o) => {
info!("Deleted Event: {}", o.message);
},
}
WatchEvent::Error(e) => {
warn!("Error event: {:?}", e);
}
Expand Down
48 changes: 21 additions & 27 deletions examples/job_openapi.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#[macro_use] extern crate log;
#[macro_use]
extern crate log;
use futures::StreamExt;
use serde_json::json;

use kube::{
api::{Api, PostParams, DeleteParams, ListParams, WatchEvent},
client::{APIClient},
api::{Api, DeleteParams, ListParams, PostParams, WatchEvent},
client::APIClient,
config,
};

Expand Down Expand Up @@ -45,36 +47,28 @@ async fn main() -> anyhow::Result<()> {

// See if it ran to completion
let lp = ListParams::default();
jobs.watch(&lp, "").await.and_then(|res| {
for status in res {
match status {
WatchEvent::Added(s) => {
info!("Added {}", s.metadata.name);
},
WatchEvent::Modified(s) => {
let current_status = s.status.clone().expect("Status is missing");
current_status.completion_time.and_then(|_| {
info!("Modified: {} is complete", s.metadata.name);
Some(())
}).or_else(|| {
info!("Modified: {} is running", s.metadata.name);
Some(())
});
},
WatchEvent::Deleted(s) => {
info!("Deleted {}", s.metadata.name);
}
WatchEvent::Error(s) => {
error!("{}", s);
let mut stream = jobs.watch(&lp, "").await?.boxed();

while let Some(status) = stream.next().await {
match status {
WatchEvent::Added(s) => info!("Added {}", s.metadata.name),
WatchEvent::Modified(s) => {
let current_status = s.status.clone().expect("Status is missing");
match current_status.completion_time {
Some(_) => info!("Modified: {} is complete", s.metadata.name),
_ => info!("Modified: {} is running", s.metadata.name),
}
}
WatchEvent::Deleted(s) => info!("Deleted {}", s.metadata.name),
WatchEvent::Error(s) => error!("{}", s),
}
Ok(())
}).expect("Failed to watch");
}

// Clean up the old job record..
info!("Deleting the job record.");
let dp = DeleteParams::default();
jobs.delete("empty-job", &dp).await.expect("failed to delete job");
jobs.delete("empty-job", &dp)
.await
.expect("failed to delete job");
Ok(())
}
53 changes: 35 additions & 18 deletions examples/node_informer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#[macro_use] extern crate log;
#[macro_use]
extern crate log;
use futures::StreamExt;
use k8s_openapi::api::core::v1::{NodeSpec, NodeStatus};
use kube::{
api::{RawApi, Api, v1Event, Informer, ListParams, WatchEvent, Object},
api::{v1Event, Api, Informer, ListParams, Object, RawApi, WatchEvent},
client::APIClient,
config,
};
use k8s_openapi::api::core::v1::{NodeSpec, NodeStatus};

type Node = Object<NodeSpec, NodeStatus>;
type Event = v1Event; // snowflake obj
Expand All @@ -20,12 +22,14 @@ async fn main() -> anyhow::Result<()> {
let events = Api::v1Event(client.clone());
let ni = Informer::raw(client.clone(), nodes)
.labels("beta.kubernetes.io/os=linux")
.init().await?;
.init()
.await?;

loop {
ni.poll().await?;
let mut nodes = ni.poll().await?.boxed();

while let Some(ne) = ni.pop() {
while let Some(ne) = nodes.next().await {
let ne = ne?;
handle_nodes(&events, ne).await?;
}
}
Expand All @@ -36,19 +40,30 @@ async fn handle_nodes(events: &Api<Event>, ne: WatchEvent<Node>) -> anyhow::Resu
match ne {
WatchEvent::Added(o) => {
info!("New Node: {}", o.spec.provider_id.unwrap());
},
}
WatchEvent::Modified(o) => {
// Nodes often modify a lot - only print broken nodes
if let Some(true) = o.spec.unschedulable {
let failed = o.status.unwrap().conditions.unwrap().into_iter().filter(|c| {
// In a failed state either some of the extra conditions are not False
// Or the Ready state is False
(c.status == "True" && c.type_ != "Ready") ||
(c.status == "False" && c.type_ == "Ready")
}).map(|c| c.message).collect::<Vec<_>>(); // failed statuses
let failed = o
.status
.unwrap()
.conditions
.unwrap()
.into_iter()
.filter(|c| {
// In a failed state either some of the extra conditions are not False
// Or the Ready state is False
(c.status == "True" && c.type_ != "Ready")
|| (c.status == "False" && c.type_ == "Ready")
})
.map(|c| c.message)
.collect::<Vec<_>>(); // failed statuses
warn!("Unschedulable Node: {}, ({:?})", o.metadata.name, failed);
// Find events related to this node
let sel = format!("involvedObject.kind=Node,involvedObject.name={}", o.metadata.name);
let sel = format!(
"involvedObject.kind=Node,involvedObject.name={}",
o.metadata.name
);
let opts = ListParams {
field_selector: Some(sel),
..Default::default()
Expand All @@ -61,14 +76,16 @@ async fn handle_nodes(events: &Api<Event>, ne: WatchEvent<Node>) -> anyhow::Resu
// Turn up logging above to see
debug!("Normal node: {}", o.metadata.name);
}
},
}
WatchEvent::Deleted(o) => {
warn!("Deleted node: {} ({:?}) running {:?} with labels: {:?}",
o.metadata.name, o.spec.provider_id.unwrap(),
warn!(
"Deleted node: {} ({:?}) running {:?} with labels: {:?}",
o.metadata.name,
o.spec.provider_id.unwrap(),
o.status.unwrap().conditions.unwrap(),
o.metadata.labels,
);
},
}
WatchEvent::Error(e) => {
warn!("Error event: {:?}", e);
}
Expand Down
38 changes: 26 additions & 12 deletions examples/pod_informer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#[macro_use] extern crate log;
use std::env;
#[macro_use]
extern crate log;
use futures::StreamExt;
use k8s_openapi::api::core::v1::{PodSpec, PodStatus};
use kube::{
api::{Api, Informer, WatchEvent, Object},
api::{Api, Informer, Object, WatchEvent},
client::APIClient,
config,
};
use k8s_openapi::api::core::v1::{PodSpec, PodStatus};
use std::env;
type Pod = Object<PodSpec, PodStatus>;

#[tokio::main]
Expand All @@ -22,10 +24,11 @@ async fn main() -> anyhow::Result<()> {
// Here we both poll and reconcile based on events from the main thread
// If you run this next to actix-web (say), spawn a thread and pass `inf` as app state
loop {
inf.poll().await?;
let mut pods = inf.poll().await?.boxed();

// Handle events one by one, draining the informer
while let Some(event) = inf.pop() {
while let Some(event) = pods.next().await {
let event = event?;
handle_node(&resource, event)?;
}
}
Expand All @@ -35,17 +38,28 @@ async fn main() -> anyhow::Result<()> {
fn handle_node(_pods: &Api<Pod>, ev: WatchEvent<Pod>) -> anyhow::Result<()> {
match ev {
WatchEvent::Added(o) => {
let containers = o.spec.containers.into_iter().map(|c| c.name).collect::<Vec<_>>();
info!("Added Pod: {} (containers={:?})", o.metadata.name, containers);
},
let containers = o
.spec
.containers
.into_iter()
.map(|c| c.name)
.collect::<Vec<_>>();
info!(
"Added Pod: {} (containers={:?})",
o.metadata.name, containers
);
}
WatchEvent::Modified(o) => {
let phase = o.status.unwrap().phase.unwrap();
let owner = &o.metadata.ownerReferences[0];
info!("Modified Pod: {} (phase={}, owner={})", o.metadata.name, phase, owner.name);
},
info!(
"Modified Pod: {} (phase={}, owner={})",
o.metadata.name, phase, owner.name
);
}
WatchEvent::Deleted(o) => {
info!("Deleted Pod: {}", o.metadata.name);
},
}
WatchEvent::Error(e) => {
warn!("Error event: {:?}", e);
}
Expand Down
Loading