Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async stream for watch events #83

Closed
clux opened this issue Oct 28, 2019 · 8 comments
Closed

async stream for watch events #83

clux opened this issue Oct 28, 2019 · 8 comments
Labels
help wanted Not immediately prioritised, please help!

Comments

@clux
Copy link
Member

clux commented Oct 28, 2019

One of the things that would be the biggest usability/performance improvements would be the being able to return watch events as they occur (without buffering the entire timeout call internally) and instead return some kind of stream from request_events.

Goofed a little bit around this using https://github.com/tokio-rs/async-stream in #82 but I might not include this for the intial async/await rewrite as it seems rather hairy.

Here is the subset of the diff I was trying to get working on top of #82 👍

diff --git Cargo.toml Cargo.toml
index f1dd2672..c3dd6e5f 100644
--- Cargo.toml
+++ Cargo.toml
@@ -29,6 +29,7 @@ log = "0.4.8"
 time = "0.1.42"
 either = "1.5.3"
 tokio = { version = "=0.2.0-alpha.6", default-features = false, features = ["timer"] }
+async-stream = "0.1.2"
 
 [features]
 default = []
diff --git src/client/mod.rs src/client/mod.rs
index beffca24..9f7b71d4 100644
--- src/client/mod.rs
+++ src/client/mod.rs
@@ -1,5 +1,6 @@
 //! A basic API client with standard kube error handling
 
+use tokio::prelude::Stream;
 use serde_json::Value;
 use either::{Right, Left};
 use either::Either;
@@ -7,6 +8,8 @@ use http::StatusCode;
 use http;
 use serde::de::DeserializeOwned;
 use serde_json;
+use async_stream::try_stream;
+
 use failure::ResultExt;
 use crate::{ApiError, Error, ErrorKind, Result};
 use crate::config::Configuration;
@@ -142,27 +145,28 @@ impl APIClient {
         }
     }
 
-    pub async fn request_events<T>(&self, request: http::Request<Vec<u8>>) -> Result<Vec<T>>
+    pub fn request_events<T>(&self, request: http::Request<Vec<u8>>) -> impl Stream<Item = Result<T>>
     where
-        T: DeserializeOwned,
+        T: DeserializeOwned + Unpin,
     {
-        let res : reqwest::Response = self.send(request).await?;
-        trace!("{} {}", res.status().as_str(), res.url());
-        //trace!("Response Headers: {:?}", res.headers());
-        let s = res.status();
-        let text = res.text().await.context(ErrorKind::RequestParse)?;
-        handle_api_errors(&text, &s)?;
-
-        // Should be able to coerce result into Vec<T> at this point
-        let mut xs : Vec<T> = vec![];
-        for l in text.lines() {
-            let r = serde_json::from_str(&l).map_err(|e| {
-                warn!("{} {:?}", l, e);
-                Error::from(ErrorKind::SerdeParse)
-            })?;
-            xs.push(r);
+        try_stream! {
+            let res : reqwest::Response = self.send(request).await.map_err(Error::into)?;
+            //trace!("{} {}", res.status().as_str(), res.url());
+            //trace!("Response Headers: {:?}", res.headers());
+            //let s = res.status();
+            //if s.is_client_error() || s.is_server_error() {
+            //    let text = res.text().await.context(ErrorKind::RequestParse)?;
+            //    handle_api_errors(&text, &s)?;
+            //}
+            while let Some(l) = res.chunk().await.map_err(|_e| Error::from(ErrorKind::RequestSend))? {
+                trace!("Chunk: {:?}", l);
+                let r : T = serde_json::from_slice(&l).map_err(|e| {
+                    warn!("{} {:?}",  String::from_utf8_lossy(&l), e);
+                    Error::from(ErrorKind::SerdeParse)
+                })?;
+                yield r;
+            }
         }
-        Ok(xs)
     }
 }

ultimately, got stopped short of weird macro-origin lifetime issues that I struggled to debug. if anyone have any ideas or suggestions for moving forward with a stream interface here, that would be really helpful.

@clux clux added enhancement help wanted Not immediately prioritised, please help! labels Oct 28, 2019
@clux
Copy link
Member Author

clux commented Nov 6, 2019

Tried to do this manually, by following seanmonstar/reqwest#682 roughly, but could not make the signature allow both an early failure and return a stream.

I did manage to make one helper function with the correct signature, but it didn't get me to the end:

    pub fn watch_unfold<T>(res: reqwest::Response) -> impl Stream<Item = Result<T>>
    where
        T: DeserializeOwned + Unpin
    {
        futures::stream::unfold(res, |mut resp| async move {
            match resp.chunk().await {
                Ok(Some(l)) => {
                    trace!("Chunk: {:?}", l);
                    return match serde_json::from_slice(&l) {
                        Ok(t) => Some((Ok(t), resp)),
                        Err(e) => {
                            warn!("{} {:?}",  String::from_utf8_lossy(&l), e);
                            Some((Err(Error::from(ErrorKind::SerdeParse)), resp))
                        },
                    }
                },
                Ok(None) => return None,
                Err(err) => return Some((Err(Error::from(ErrorKind::RequestSend)), resp)),
            }
        })
    }
}
+futures-util = "0.3.0"
+futures = "0.3.0"
+use futures::future::TryFutureExt; // producer
+use futures::{future, stream, Future, Stream};
+use futures_util::TryStreamExt; // consumer

@milesgranger
Copy link

Just to clarify, I suppose this is the reason I'm seeing this when trying to upgrade to 0.18.0 (after updating all my async/awaits)?

if let Some(item) = self.body.next().await {
|                   ^^^^^^^^^^^^^^^^^^^^^^ the trait `futures_core::stream::Stream` is not implemented for `futures_util::stream::peek::Peekable<async_impl::decoder::imp::IoStream>`

@clux
Copy link
Member Author

clux commented Nov 11, 2019

Yeah, the API is not yet to await each individual element. You currently need to await the poll:

https://github.com/clux/kube-rs/blob/41dbdee8eb72c82b3e41a7a2d183103f303e2e9c/examples/pod_informer.rs#L24-L30

@kitfre
Copy link

kitfre commented Nov 22, 2019

Looking at your example above using unfold, I was able to make it work (seemingly without any changes from your example even?) Source here

I also went ahead and updated the rest of the code that needed updating with this change and the jobs example. If this is the sort of implementation you're looking for I'd be happy to make a PR and then iterate on it, though I might be slow over the next few days due to travel.

@clux
Copy link
Member Author

clux commented Nov 22, 2019

Oh, nice! I was missing how to get the function that returned impl Stream to handle a step before the unfold, and the compiler errors looked dense. It's awesome if you've actually managed to make it work!

Happy to take a pr and help it get to a final state. I am also travelling a bit, but will try to at least respond ASAP.

Interesting that watch needs to return a boxed mut stream. Do you know the reason for that?

@kitfre
Copy link

kitfre commented Nov 22, 2019

Sounds good to me, PR on the way! As for the mut boxed stream, the mut is so that we can call .next().await using StreamExt, and the boxed is to get around compiler complaints of the stream not being Unpin. I'm not entirely sure why it's not Unpin but I've run into that before in some other work, and stumbled across boxed as a solution to that.

@clux
Copy link
Member Author

clux commented Nov 22, 2019

Oh, that makes sense! I've seen people require Unpin on the input types to avoid this. We could try to require Unpin on K in the various trait requirements on Informer etc. This might help avoid the boxed call. I think this is fine as we're just moving out raw data, and kube objects don't contain anything but yaml specifieable data.

@clux
Copy link
Member Author

clux commented Nov 29, 2019

Closed in #92 and kube 0.21.0

@clux clux closed this as completed Nov 29, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Not immediately prioritised, please help!
Projects
None yet
Development

No branches or pull requests

3 participants