diff --git a/Cargo.toml b/Cargo.toml index c07c238ed..573c46952 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ time = "0.1.42" either = "1.5.3" thiserror = "1.0.5" futures-timer = "2.0.0" +futures = "0.3.1" [features] default = [] diff --git a/examples/event_informer.rs b/examples/event_informer.rs index b6c4a0254..726214bd3 100644 --- a/examples/event_informer.rs +++ b/examples/event_informer.rs @@ -1,11 +1,14 @@ -#[macro_use] extern crate log; +#[macro_use] +extern crate log; use kube::{ - api::{Api, Informer, WatchEvent}, api::v1Event, + api::{Api, Informer, WatchEvent}, client::APIClient, config, }; +use futures::StreamExt; + #[tokio::main] async fn main() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "info,kube=trace"); @@ -14,13 +17,13 @@ async fn main() -> anyhow::Result<()> { let client = APIClient::new(config); let events = Api::v1Event(client); - let ei = Informer::new(events) - .init().await?; + let ei = Informer::new(events).init().await?; loop { - ei.poll().await?; + let mut events = ei.poll().await?.boxed(); - while let Some(event) = ei.pop() { + while let Some(event) = events.next().await { + let event = event?; handle_events(event)?; } } @@ -31,13 +34,13 @@ fn handle_events(ev: WatchEvent) -> anyhow::Result<()> { match ev { WatchEvent::Added(o) => { info!("New Event: {}, {}", o.type_, o.message); - }, + } WatchEvent::Modified(o) => { info!("Modified Event: {}", o.reason); - }, + } WatchEvent::Deleted(o) => { info!("Deleted Event: {}", o.message); - }, + } WatchEvent::Error(e) => { warn!("Error event: {:?}", e); } diff --git a/examples/job_openapi.rs b/examples/job_openapi.rs index 1187e3f81..66d31e328 100644 --- a/examples/job_openapi.rs +++ b/examples/job_openapi.rs @@ -1,9 +1,11 @@ -#[macro_use] extern crate log; +#[macro_use] +extern crate log; +use futures::StreamExt; use serde_json::json; use kube::{ - api::{Api, PostParams, DeleteParams, ListParams, WatchEvent}, - client::{APIClient}, + api::{Api, DeleteParams, ListParams, PostParams, WatchEvent}, + client::APIClient, config, }; @@ -45,36 +47,28 @@ async fn main() -> anyhow::Result<()> { // See if it ran to completion let lp = ListParams::default(); - jobs.watch(&lp, "").await.and_then(|res| { - for status in res { - match status { - WatchEvent::Added(s) => { - info!("Added {}", s.metadata.name); - }, - WatchEvent::Modified(s) => { - let current_status = s.status.clone().expect("Status is missing"); - current_status.completion_time.and_then(|_| { - info!("Modified: {} is complete", s.metadata.name); - Some(()) - }).or_else(|| { - info!("Modified: {} is running", s.metadata.name); - Some(()) - }); - }, - WatchEvent::Deleted(s) => { - info!("Deleted {}", s.metadata.name); - } - WatchEvent::Error(s) => { - error!("{}", s); + let mut stream = jobs.watch(&lp, "").await?.boxed(); + + while let Some(status) = stream.next().await { + match status { + WatchEvent::Added(s) => info!("Added {}", s.metadata.name), + WatchEvent::Modified(s) => { + let current_status = s.status.clone().expect("Status is missing"); + match current_status.completion_time { + Some(_) => info!("Modified: {} is complete", s.metadata.name), + _ => info!("Modified: {} is running", s.metadata.name), } } + WatchEvent::Deleted(s) => info!("Deleted {}", s.metadata.name), + WatchEvent::Error(s) => error!("{}", s), } - Ok(()) - }).expect("Failed to watch"); + } // Clean up the old job record.. info!("Deleting the job record."); let dp = DeleteParams::default(); - jobs.delete("empty-job", &dp).await.expect("failed to delete job"); + jobs.delete("empty-job", &dp) + .await + .expect("failed to delete job"); Ok(()) } diff --git a/examples/node_informer.rs b/examples/node_informer.rs index 6142540f7..c43dd0943 100644 --- a/examples/node_informer.rs +++ b/examples/node_informer.rs @@ -1,10 +1,12 @@ -#[macro_use] extern crate log; +#[macro_use] +extern crate log; +use futures::StreamExt; +use k8s_openapi::api::core::v1::{NodeSpec, NodeStatus}; use kube::{ - api::{RawApi, Api, v1Event, Informer, ListParams, WatchEvent, Object}, + api::{v1Event, Api, Informer, ListParams, Object, RawApi, WatchEvent}, client::APIClient, config, }; -use k8s_openapi::api::core::v1::{NodeSpec, NodeStatus}; type Node = Object; type Event = v1Event; // snowflake obj @@ -20,12 +22,14 @@ async fn main() -> anyhow::Result<()> { let events = Api::v1Event(client.clone()); let ni = Informer::raw(client.clone(), nodes) .labels("beta.kubernetes.io/os=linux") - .init().await?; + .init() + .await?; loop { - ni.poll().await?; + let mut nodes = ni.poll().await?.boxed(); - while let Some(ne) = ni.pop() { + while let Some(ne) = nodes.next().await { + let ne = ne?; handle_nodes(&events, ne).await?; } } @@ -36,19 +40,30 @@ async fn handle_nodes(events: &Api, ne: WatchEvent) -> anyhow::Resu match ne { WatchEvent::Added(o) => { info!("New Node: {}", o.spec.provider_id.unwrap()); - }, + } WatchEvent::Modified(o) => { // Nodes often modify a lot - only print broken nodes if let Some(true) = o.spec.unschedulable { - let failed = o.status.unwrap().conditions.unwrap().into_iter().filter(|c| { - // In a failed state either some of the extra conditions are not False - // Or the Ready state is False - (c.status == "True" && c.type_ != "Ready") || - (c.status == "False" && c.type_ == "Ready") - }).map(|c| c.message).collect::>(); // failed statuses + let failed = o + .status + .unwrap() + .conditions + .unwrap() + .into_iter() + .filter(|c| { + // In a failed state either some of the extra conditions are not False + // Or the Ready state is False + (c.status == "True" && c.type_ != "Ready") + || (c.status == "False" && c.type_ == "Ready") + }) + .map(|c| c.message) + .collect::>(); // failed statuses warn!("Unschedulable Node: {}, ({:?})", o.metadata.name, failed); // Find events related to this node - let sel = format!("involvedObject.kind=Node,involvedObject.name={}", o.metadata.name); + let sel = format!( + "involvedObject.kind=Node,involvedObject.name={}", + o.metadata.name + ); let opts = ListParams { field_selector: Some(sel), ..Default::default() @@ -61,14 +76,16 @@ async fn handle_nodes(events: &Api, ne: WatchEvent) -> anyhow::Resu // Turn up logging above to see debug!("Normal node: {}", o.metadata.name); } - }, + } WatchEvent::Deleted(o) => { - warn!("Deleted node: {} ({:?}) running {:?} with labels: {:?}", - o.metadata.name, o.spec.provider_id.unwrap(), + warn!( + "Deleted node: {} ({:?}) running {:?} with labels: {:?}", + o.metadata.name, + o.spec.provider_id.unwrap(), o.status.unwrap().conditions.unwrap(), o.metadata.labels, ); - }, + } WatchEvent::Error(e) => { warn!("Error event: {:?}", e); } diff --git a/examples/pod_informer.rs b/examples/pod_informer.rs index 176b80d60..7da0fcdbf 100644 --- a/examples/pod_informer.rs +++ b/examples/pod_informer.rs @@ -1,11 +1,13 @@ -#[macro_use] extern crate log; -use std::env; +#[macro_use] +extern crate log; +use futures::StreamExt; +use k8s_openapi::api::core::v1::{PodSpec, PodStatus}; use kube::{ - api::{Api, Informer, WatchEvent, Object}, + api::{Api, Informer, Object, WatchEvent}, client::APIClient, config, }; -use k8s_openapi::api::core::v1::{PodSpec, PodStatus}; +use std::env; type Pod = Object; #[tokio::main] @@ -22,10 +24,11 @@ async fn main() -> anyhow::Result<()> { // Here we both poll and reconcile based on events from the main thread // If you run this next to actix-web (say), spawn a thread and pass `inf` as app state loop { - inf.poll().await?; + let mut pods = inf.poll().await?.boxed(); // Handle events one by one, draining the informer - while let Some(event) = inf.pop() { + while let Some(event) = pods.next().await { + let event = event?; handle_node(&resource, event)?; } } @@ -35,17 +38,28 @@ async fn main() -> anyhow::Result<()> { fn handle_node(_pods: &Api, ev: WatchEvent) -> anyhow::Result<()> { match ev { WatchEvent::Added(o) => { - let containers = o.spec.containers.into_iter().map(|c| c.name).collect::>(); - info!("Added Pod: {} (containers={:?})", o.metadata.name, containers); - }, + let containers = o + .spec + .containers + .into_iter() + .map(|c| c.name) + .collect::>(); + info!( + "Added Pod: {} (containers={:?})", + o.metadata.name, containers + ); + } WatchEvent::Modified(o) => { let phase = o.status.unwrap().phase.unwrap(); let owner = &o.metadata.ownerReferences[0]; - info!("Modified Pod: {} (phase={}, owner={})", o.metadata.name, phase, owner.name); - }, + info!( + "Modified Pod: {} (phase={}, owner={})", + o.metadata.name, phase, owner.name + ); + } WatchEvent::Deleted(o) => { info!("Deleted Pod: {}", o.metadata.name); - }, + } WatchEvent::Error(e) => { warn!("Error event: {:?}", e); } diff --git a/src/api/informer.rs b/src/api/informer.rs index 085071339..cb5962b0a 100644 --- a/src/api/informer.rs +++ b/src/api/informer.rs @@ -1,27 +1,16 @@ -use crate::api::{ - RawApi, - Api, - ListParams, - Void, -}; -use crate::api::resource::{ - ObjectList, - WatchEvent, - KubeObject, -}; +use crate::api::resource::{KubeObject, ObjectList, WatchEvent}; +use crate::api::{Api, ListParams, RawApi, Void}; use crate::client::APIClient; -use crate::{Result}; +use crate::Result; -use serde::de::DeserializeOwned; +use futures::{Stream, StreamExt}; use futures_timer::Delay; +use serde::de::DeserializeOwned; use std::{ - collections::VecDeque, sync::{Arc, RwLock}, - time::{Duration}, + time::Duration, }; -type WatchQueue = VecDeque>; - /// An event informer for a `Resource` /// /// This watches a `Resource`, by: @@ -32,17 +21,20 @@ type WatchQueue = VecDeque>; /// It caches WatchEvent internally in a queue when polling. /// A user should drain this queue periodically. #[derive(Clone)] -pub struct Informer where - K: Clone + DeserializeOwned + KubeObject +pub struct Informer +where + K: Clone + DeserializeOwned + KubeObject, { - events: Arc>>, version: Arc>, client: APIClient, resource: RawApi, params: ListParams, + needs_resync: Arc>, + _object_type: std::marker::PhantomData, } -impl Informer where +impl Informer +where K: Clone + DeserializeOwned + KubeObject, { /// Create a reflector with a kube client on a kube resource @@ -51,13 +43,15 @@ impl Informer where client: r.client, resource: r.api, params: ListParams::default(), - events: Arc::new(RwLock::new(VecDeque::new())), version: Arc::new(RwLock::new(0.to_string())), + needs_resync: Arc::new(RwLock::new(false)), + _object_type: std::marker::PhantomData, } } } -impl Informer where +impl Informer +where K: Clone + DeserializeOwned + KubeObject, { /// Create a reflector with a kube client on a kube resource @@ -66,8 +60,9 @@ impl Informer where client, resource: r, params: ListParams::default(), - events: Arc::new(RwLock::new(VecDeque::new())), version: Arc::new(RwLock::new(0.to_string())), + needs_resync: Arc::new(RwLock::new(false)), + _object_type: std::marker::PhantomData, } } @@ -126,35 +121,82 @@ impl Informer where self } - /// Run a single watch poll /// /// If this returns an error, it resets the resourceVersion. /// This is meant to be run continually and events are meant to be handled between. - /// If handling all the events is too time consuming, you probably need a queue. - pub async fn poll(&self) -> Result<()> { + /// poll returns a Stream so events can be handled asynchronously + pub async fn poll(&self) -> Result>>> { trace!("Watching {:?}", self.resource); - match self.single_watch().await { - Ok((events, newver)) => { - *self.version.write().unwrap() = newver; - for e in events { - self.events.write().unwrap().push_back(e); - } - }, + + // First check if we need to resync, if so reset our resource version + // and wait a bit before proceeding. + // We take a read only lock here as most of the time it will not be necessary + // to take an exclusive lock. + if *self.needs_resync.read().unwrap() { + // If desynched due to mismatching resourceVersion, retry in a bit + let dur = Duration::from_secs(10); + Delay::new(dur).await; + self.reset().await?; + *self.needs_resync.write().unwrap() = false; + } + + // Create our watch request + let req = self.resource.watch(&self.params, &self.version())?; + + // Clone our version so we can move it into the Stream handling + // and avoid a 'static lifetime on self + let version = self.version.clone(); + + // Clone our resync flag similarly + let needs_resync = self.needs_resync.clone(); + + // Attempt to fetch our stream + let stream = self.client.request_events::>(req).await; + + match stream { + Ok(events) => { + // Add a map stage to the stream which will update our version + // based on each incoming event + Ok(events.map(move |event| { + // Check if we need to update our version based on the incoming events + let new_version = match &event { + Ok(WatchEvent::Added(o)) + | Ok(WatchEvent::Modified(o)) + | Ok(WatchEvent::Deleted(o)) => o.meta().resourceVersion.clone(), + Ok(WatchEvent::Error(e)) => { + // If we hit a 410 desync error, mark that we need to resync on the next call + if e.code == 410 { + warn!("Stream desynced: {:?}", e); + *needs_resync.write().unwrap() = true; + } + None + } + Err(e) => { + warn!("Unexpected watch error: {:?}", e); + None + } + }; + + // Update our version need be + // Follow docs conventions and store the last resourceVersion + // https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes + if let Some(nv) = new_version { + *version.write().unwrap() = nv; + } + + event + })) + } Err(e) => { warn!("Poll error: {:?}", e); - // If desynched due to mismatching resourceVersion, retry in a bit - let dur = Duration::from_secs(10); - Delay::new(dur).await; - self.reset().await?; + // Set that we need a resync for the next poll + // which will then reset our resource version and + // wait a bit + *self.needs_resync.write().unwrap() = false; + Err(e) } - }; - Ok(()) - } - - /// Pop an event from the front of the WatchQueue - pub fn pop(&self) -> Option> { - self.events.write().unwrap().pop_front() + } } /// Reset the resourceVersion to current and clear the event queue @@ -162,7 +204,6 @@ impl Informer where // Fetch a new initial version: let initial = self.get_resource_version().await?; *self.version.write().unwrap() = initial; - self.events.write().unwrap().clear(); Ok(()) } @@ -171,7 +212,6 @@ impl Informer where self.version.read().unwrap().clone() } - /// Init helper async fn get_resource_version(&self) -> Result { let req = self.resource.list_zero_resource_entries(&self.params)?; @@ -180,28 +220,10 @@ impl Informer where let res = self.client.request::>(req).await?; let version = res.metadata.resourceVersion.unwrap_or_else(|| "0".into()); - debug!("Got fresh resourceVersion={} for {}", version, self.resource.resource); - Ok( version ) - } - - /// Watch helper - async fn single_watch(&self) -> Result<(Vec>, String)> { - let oldver = self.version(); - let req = self.resource.watch(&self.params, &oldver)?; - let events = self.client.request_events::>(req).await?; - - // Follow docs conventions and store the last resourceVersion - // https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes - let newver = events.iter().filter_map(|e| { - match e { - WatchEvent::Added(o) => o.meta().resourceVersion.clone(), - WatchEvent::Modified(o) => o.meta().resourceVersion.clone(), - WatchEvent::Deleted(o) => o.meta().resourceVersion.clone(), - _ => None - } - }).last().unwrap_or(oldver); - debug!("Got {} {} events, resourceVersion={}", events.len(), self.resource.resource, newver); - - Ok((events, newver)) + debug!( + "Got fresh resourceVersion={} for {}", + version, self.resource.resource + ); + Ok(version) } } diff --git a/src/api/reflector.rs b/src/api/reflector.rs index d038af222..716f18d4a 100644 --- a/src/api/reflector.rs +++ b/src/api/reflector.rs @@ -1,18 +1,15 @@ -use crate::api::{RawApi, Api, ListParams, ObjectMeta}; -use crate::api::resource::{ - ObjectList, - WatchEvent, - KubeObject, -}; -use serde::de::DeserializeOwned; -use futures_timer::Delay; +use crate::api::resource::{KubeObject, ObjectList, WatchEvent}; +use crate::api::{Api, ListParams, ObjectMeta, RawApi}; use crate::client::APIClient; -use crate::{Result, Error}; +use crate::{Error, Result}; +use futures::StreamExt; +use futures_timer::Delay; +use serde::de::DeserializeOwned; use std::{ collections::BTreeMap, sync::{Arc, RwLock}, - time::{Duration}, + time::Duration, }; /// Internal representation for Reflector @@ -27,7 +24,8 @@ type Cache = BTreeMap; /// /// It exposes it's internal state readably through a getter. #[derive(Clone)] -pub struct Reflector where +pub struct Reflector +where K: Clone + DeserializeOwned, { data: Arc>>, @@ -37,7 +35,8 @@ pub struct Reflector where params: ListParams, } -impl Reflector where +impl Reflector +where K: Clone + DeserializeOwned, { /// Create a reflector with a kube client on a kube resource @@ -52,8 +51,8 @@ impl Reflector where } } - -impl Reflector where +impl Reflector +where K: Clone + DeserializeOwned + KubeObject, { /// Create a reflector with a kube client on a kube resource @@ -131,7 +130,6 @@ impl Reflector where Ok(()) } - /// Read data for users of the reflector /// /// TODO: deprecate in favour of a stream returning fn.. @@ -145,11 +143,7 @@ impl Reflector where // Very little that can be done in this case. Upgrade your app / resource. let cache = self.data.read().unwrap(); - Ok(cache - .values() - .cloned() - .collect::>() - ) + Ok(cache.values().cloned().collect::>()) } /// Read a single entry by name @@ -159,8 +153,8 @@ impl Reflector where /// Try `Reflector::get_within` instead. pub fn get(&self, name: &str) -> Result> { let id = ObjectId { - name: name.into(), - namespace: self.resource.namespace.clone() + name: name.into(), + namespace: self.resource.namespace.clone(), }; Ok(self.data.read().unwrap().get(&id).map(Clone::clone)) } @@ -171,8 +165,8 @@ impl Reflector where /// This is only useful if your reflector is configured to poll across namsepaces. pub fn get_within(&self, name: &str, ns: &str) -> Result> { let id = ObjectId { - name: name.into(), - namespace: Some(ns.into()) + name: name.into(), + namespace: Some(ns.into()), }; Ok(self.data.read().unwrap().get(&id).map(Clone::clone)) } @@ -188,7 +182,6 @@ impl Reflector where Ok(()) } - async fn get_full_resource_entries(&self) -> Result<(Cache, String)> { let req = self.resource.list(&self.params)?; // NB: Object isn't general enough here @@ -196,12 +189,21 @@ impl Reflector where let mut data = BTreeMap::new(); let version = res.metadata.resourceVersion.unwrap_or_else(|| "".into()); - trace!("Got {} {} at resourceVersion={:?}", res.items.len(), self.resource.resource, version); + trace!( + "Got {} {} at resourceVersion={:?}", + res.items.len(), + self.resource.resource, + version + ); for i in res.items { // The non-generic parts we care about are spec + status data.insert(i.meta().into(), i); } - let keys = data.keys().map(|key: &ObjectId| key.to_string()).collect::>().join(", "); + let keys = data + .keys() + .map(|key: &ObjectId| key.to_string()) + .collect::>() + .join(", "); debug!("Initialized with: {}", keys); Ok((data, version)) } @@ -211,7 +213,11 @@ impl Reflector where let rg = &self.resource; let oldver = { self.version.read().unwrap().clone() }; let req = rg.watch(&self.params, &oldver)?; - let res = self.client.request_events::>(req).await?; + let mut events = self + .client + .request_events::>(req) + .await? + .boxed_local(); // We use boxed_local to remove the Send requirement as we're not shipping this between threads // Update in place: let mut data = self.data.write().unwrap(); @@ -219,35 +225,37 @@ impl Reflector where // Follow docs conventions and store the last resourceVersion // https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes - for ev in res { + while let Some(ev) = events.next().await { match ev { - WatchEvent::Added(o) => { + Ok(WatchEvent::Added(o)) => { debug!("Adding {} to {}", o.meta().name, rg.resource); - data.entry(o.meta().into()) - .or_insert_with(|| o.clone()); + data.entry(o.meta().into()).or_insert_with(|| o.clone()); if let Some(v) = &o.meta().resourceVersion { *ver = v.to_string(); } - }, - WatchEvent::Modified(o) => { + } + Ok(WatchEvent::Modified(o)) => { debug!("Modifying {} in {}", o.meta().name, rg.resource); - data.entry(o.meta().into()) - .and_modify(|e| *e = o.clone()); + data.entry(o.meta().into()).and_modify(|e| *e = o.clone()); if let Some(v) = &o.meta().resourceVersion { *ver = v.to_string(); } - }, - WatchEvent::Deleted(o) => { + } + Ok(WatchEvent::Deleted(o)) => { debug!("Removing {} from {}", o.meta().name, rg.resource); data.remove(&o.meta().into()); if let Some(v) = &o.meta().resourceVersion { - *ver = v.to_string(); + *ver = v.to_string(); } } - WatchEvent::Error(e) => { + Ok(WatchEvent::Error(e)) => { warn!("Failed to watch {}: {:?}", rg.resource, e); Err(Error::Api(e))? } + Err(e) => { + // Just log out the error, but don't stop our stream short + warn!("Problem fetch event from server: {:?}", e); + } } } Ok(()) diff --git a/src/api/typed.rs b/src/api/typed.rs index 1ecac3afd..bc0a8a3aa 100644 --- a/src/api/typed.rs +++ b/src/api/typed.rs @@ -1,25 +1,14 @@ #![allow(non_snake_case)] -use either::{Either}; +use either::Either; +use futures::{Stream, StreamExt}; use serde::de::DeserializeOwned; use std::marker::PhantomData; -use crate::api::{ - RawApi, - PostParams, - DeleteParams, - ListParams, - PatchParams, - LogParams -}; -use crate::api::resource::{ - ObjectList, Object, WatchEvent, KubeObject, -}; -use crate::client::{ - APIClient, - Status, -}; -use crate::{Result}; +use crate::api::resource::{KubeObject, Object, ObjectList, WatchEvent}; +use crate::api::{DeleteParams, ListParams, LogParams, PatchParams, PostParams, RawApi}; +use crate::client::{APIClient, Status}; +use crate::Result; /// A typed Api variant that does not expose request internals /// @@ -59,7 +48,8 @@ impl Api { } /// PUSH/PUT/POST/GET abstractions -impl Api where +impl Api +where K: Clone + DeserializeOwned + KubeObject, { pub async fn get(&self, name: &str) -> Result { @@ -78,7 +68,10 @@ impl Api where let req = self.api.list(&lp)?; self.client.request::>(req).await } - pub async fn delete_collection(&self, lp: &ListParams) -> Result, Status>> { + pub async fn delete_collection( + &self, + lp: &ListParams, + ) -> Result, Status>> { let req = self.api.delete_collection(&lp)?; self.client.request_status::>(req).await } @@ -90,10 +83,18 @@ impl Api where let req = self.api.replace(name, &pp, data)?; self.client.request::(req).await } - pub async fn watch(&self, lp: &ListParams, version: &str) -> Result>> { + pub async fn watch( + &self, + lp: &ListParams, + version: &str, + ) -> Result>> { let req = self.api.watch(&lp, &version)?; - self.client.request_events::>(req).await + self.client + .request_events::>(req) + .await + .map(|stream| stream.filter_map(|e| async move { e.ok() })) } + pub async fn get_status(&self, name: &str) -> Result { let req = self.api.get_status(name)?; self.client.request::(req).await @@ -108,12 +109,12 @@ impl Api where } } - /// Marker trait for objects that has logs pub trait LoggingObject {} -impl Api where - K: Clone + DeserializeOwned + KubeObject + LoggingObject +impl Api +where + K: Clone + DeserializeOwned + KubeObject + LoggingObject, { pub async fn log(&self, name: &str, lp: &LogParams) -> Result { let req = self.api.log(name, lp)?; @@ -137,7 +138,8 @@ pub type Scale = Object; /// Scale subresource /// /// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#scale-subresource -impl Api where +impl Api +where K: Clone + DeserializeOwned, { pub async fn get_scale(&self, name: &str) -> Result { @@ -157,7 +159,8 @@ impl Api where /// Api Constructor for CRDs /// /// Because it relies entirely on user definitions, this ctor does not rely on openapi. -impl Api where +impl Api +where K: Clone + DeserializeOwned, { pub fn customResource(client: APIClient, name: &str) -> Self { diff --git a/src/client/mod.rs b/src/client/mod.rs index 3c2417b75..8f49d7942 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,15 +1,16 @@ //! A basic API client with standard kube error handling -use serde_json::Value; -use either::{Right, Left}; +use crate::config::Configuration; +use crate::{Error, ErrorResponse, Result}; use either::Either; -use http::StatusCode; +use either::{Left, Right}; +use futures::StreamExt; +use futures::{self, Stream}; use http; +use http::StatusCode; use serde::de::DeserializeOwned; use serde_json; -use crate::{Error, ErrorResponse, Result}; -use crate::config::Configuration; - +use serde_json::Value; #[allow(non_snake_case)] #[derive(Deserialize, Debug)] @@ -25,7 +26,7 @@ pub struct StatusDetails { #[serde(default, skip_serializing_if = "Vec::is_empty")] pub causes: Vec, #[serde(default, skip_serializing_if = "num::Zero::is_zero")] - pub retryAfterSeconds: u32 + pub retryAfterSeconds: u32, } #[derive(Deserialize, Debug)] @@ -65,8 +66,7 @@ impl APIClient { APIClient { configuration } } - async fn send(&self, request: http::Request>) -> Result - { + async fn send(&self, request: http::Request>) -> Result { let (parts, body) = request.into_parts(); let uri_str = format!("{}{}", self.configuration.base_path, parts.uri); trace!("{} {}", parts.method, uri_str); @@ -77,19 +77,21 @@ impl APIClient { http::Method::DELETE => self.configuration.client.delete(&uri_str), http::Method::PUT => self.configuration.client.put(&uri_str), http::Method::PATCH => self.configuration.client.patch(&uri_str), - other => Err(Error::InvalidMethod(other.to_string()))? - }.headers(parts.headers).body(body).build()?; + other => Err(Error::InvalidMethod(other.to_string()))?, + } + .headers(parts.headers) + .body(body) + .build()?; //trace!("Request Headers: {:?}", req.headers()); let res = self.configuration.client.execute(req).await?; Ok(res) } - pub async fn request(&self, request: http::Request>) -> Result where T: DeserializeOwned, { - let res : reqwest::Response = self.send(request).await?; + let res: reqwest::Response = self.send(request).await?; trace!("{} {}", res.status().as_str(), res.url()); //trace!("Response Headers: {:?}", res.headers()); let s = res.status(); @@ -102,9 +104,8 @@ impl APIClient { }) } - pub async fn request_text(&self, request: http::Request>) -> Result - { - let res : reqwest::Response = self.send(request).await?; + pub async fn request_text(&self, request: http::Request>) -> Result { + let res: reqwest::Response = self.send(request).await?; trace!("{} {}", res.status().as_str(), res.url()); //trace!("Response Headers: {:?}", res.headers()); let s = res.status(); @@ -114,11 +115,14 @@ impl APIClient { Ok(text) } - pub async fn request_status(&self, request: http::Request>) -> Result> + pub async fn request_status( + &self, + request: http::Request>, + ) -> Result> where T: DeserializeOwned, { - let res : reqwest::Response = self.send(request).await?; + let res: reqwest::Response = self.send(request).await?; trace!("{} {}", res.status().as_str(), res.url()); //trace!("Response Headers: {:?}", res.headers()); let s = res.status(); @@ -129,10 +133,12 @@ impl APIClient { let v: Value = serde_json::from_str(&text)?; if v["kind"] == "Status" { trace!("Status from {}", text); - Ok(Right(serde_json::from_str::(&text).map_err(|e| { - warn!("{}, {:?}", text, e); - Error::SerdeError(e) - })?)) + Ok(Right(serde_json::from_str::(&text).map_err( + |e| { + warn!("{}, {:?}", text, e); + Error::SerdeError(e) + }, + )?)) } else { Ok(Left(serde_json::from_str::(&text).map_err(|e| { warn!("{}, {:?}", text, e); @@ -141,27 +147,76 @@ impl APIClient { } } - pub async fn request_events(&self, request: http::Request>) -> Result> + pub async fn request_events( + &self, + request: http::Request>, + ) -> Result>> where T: DeserializeOwned, { - let res : reqwest::Response = self.send(request).await?; + let res: reqwest::Response = self.send(request).await?; trace!("{} {}", res.status().as_str(), res.url()); - //trace!("Response Headers: {:?}", res.headers()); - let s = res.status(); - let text = res.text().await?; - handle_api_errors(&text, &s)?; - // Should be able to coerce result into Vec at this point - let mut xs : Vec = vec![]; - for l in text.lines() { - let r = serde_json::from_str(&l).map_err(|e| { - warn!("{} {:?}", l, e); - Error::SerdeError(e) - })?; - xs.push(r); - } - Ok(xs) + // Now use `unfold` to convert the chunked responses into a Stream + // We first construct a Stream of Vec> as we potentially might need to + // yield multiple objects per loop, then we flatten it to the Stream> as expected. + // Any reqwest errors will terminate this stream early. + let stream = futures::stream::unfold((res, Vec::new()), |(mut resp, mut buff)| { + async { + loop { + match resp.chunk().await { + Ok(Some(chunk)) => { + buff.extend_from_slice(&chunk); + + // If we've encountered a newline, see if we have any items to yield + if chunk.contains(&b'\n') { + let mut new_buff = Vec::new(); + let mut items = Vec::new(); + + // Split on newlines + for line in buff.split(|x| x == &b'\n') { + new_buff.extend_from_slice(&line); + + match serde_json::from_slice(&new_buff) { + Ok(val) => { + // on success clear our buffer + new_buff.clear(); + items.push(Ok(val)); + } + Err(e) => { + // If this is not an eof error it's a parse error + // so log it and store it + // Otherwise we don't do anything as we've already + // added in the current partial line to our buffer for + // use in the next loop + if !e.is_eof() { + warn!( + "Failed to parse: {}", + String::from_utf8_lossy(line) + ); + // Clear the buffer as this was a valid object + new_buff.clear(); + items.push(Err(Error::SerdeError(e))); + } + } + } + } + + // Now return our items and loop + return Some((items, (resp, new_buff))); + } + } + Ok(None) => return None, + Err(e) => { + let err = vec![Err(Error::ReqwestError(e))]; + return Some((err, (resp, buff))); + } + } + } + } + }); + + Ok(stream.map(futures::stream::iter).flatten()) } } @@ -186,7 +241,7 @@ fn handle_api_errors(text: &str, s: &StatusCode) -> Result<()> { status: s.to_string(), code: s.as_u16(), message: format!("{:?}", text), - reason: "Failed to parse error data".into() + reason: "Failed to parse error data".into(), }; debug!("Unsuccessful: {:?} (reconstruct)", ae); Err(Error::Api(ae)) diff --git a/src/lib.rs b/src/lib.rs index 4d8ef1c06..e54ec94ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,9 @@ use thiserror::Error; -#[macro_use] extern crate serde_derive; -#[macro_use] extern crate log; +#[macro_use] +extern crate serde_derive; +#[macro_use] +extern crate log; #[derive(Error, Deserialize, Serialize, Debug, Clone, Eq, PartialEq)] #[error("{message}: {reason}")] @@ -14,8 +16,6 @@ pub struct ErrorResponse { pub code: u16, } - - #[derive(Error, Debug)] pub enum Error { /// ApiError for when things fail @@ -60,7 +60,7 @@ pub enum Error { pub type Result = std::result::Result; +pub mod api; pub mod client; pub mod config; -pub mod api; mod oauth2;