Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return a Stream<Item=Result<T>> from request_events #92

Merged
merged 25 commits into from
Nov 29, 2019

Conversation

kitfre
Copy link

@kitfre kitfre commented Nov 22, 2019

Altered the client's request_events method to return a Stream and updated dependent code.
#83

@kitfre
Copy link
Author

kitfre commented Nov 22, 2019

Ahh seems like I had rustfmt on save which resulted in more changes that just the code, let me know if you want me to revert that, I can just make a fresh, non-rustfmt'd version

Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initial comments. looks great!

src/api/informer.rs Outdated Show resolved Hide resolved
src/api/informer.rs Show resolved Hide resolved
src/api/reflector.rs Outdated Show resolved Hide resolved
src/api/typed.rs Show resolved Hide resolved
src/client/mod.rs Outdated Show resolved Hide resolved
@clux
Copy link
Member

clux commented Nov 22, 2019

Ahh seems like I had rustfmt on save which resulted in more changes that just the code, let me know if you want me to revert that, I can just make a fresh, non-rustfmt'd version

I have a little more tolerance for longer lines in signature, but otherwise the changes are appropriate. Don't mind the changes being kept in. At least you're only changing the files you touch.

Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work. some questions.

src/api/informer.rs Outdated Show resolved Hide resolved
src/api/informer.rs Show resolved Hide resolved
examples/event_informer.rs Outdated Show resolved Hide resolved
Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! I will have a quick sanity check on your branch then merge :-)

@clux
Copy link
Member

clux commented Nov 23, 2019

Hah, I think the stuff that breaks is my original unfold code.

It looks like all informers break inside the client's unfold during deserialization. It appears the chunks aren't entire objects, but just arbitrarily cut off api chunks. We might need another level of buffering :|

cargo run --example node_informer --all-features
    Finished dev [unoptimized + debuginfo] target(s) in 0.07s
     Running `target/debug/examples/node_informer`
[2019-11-23T03:46:43Z DEBUG kube::api::informer] Got fresh resourceVersion=203267648 for nodes
[2019-11-23T03:46:43Z INFO  kube::api::informer] Starting Informer for RawApi { resource: "nodes", group: "", namespace: None, version: "v1", prefix: "api" }
[2019-11-23T03:46:44Z WARN  kube::client] {"type":"MODIFIED","object":{"kind":"Node","apiVersion":"v1","metadata":{"name":"ip-10-100-20-XX.eu-west-2.compute.internal","selfLink":"/api/v1/nodes/ip-10-100-20-XX.eu-west-2.compute.internal","uid":"934","resourceVersion":"203267649","creationTimestamp":"2019-10-31T14:06:03Z","labels":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/instance-type":"m4.2xlarge","beta.kubernetes.io/os":"linux","failure-domain.beta.kubernetes.io/region":"eu-west-2","failure-domain.beta.kubernetes.io/zone":"eu-west-2a","kubernetes.io/hostname":"ip-10-100-20-XX.eu-west-2.compute.internal","kubernetes.io/lifecycle":"spot"},"annotations":{"node.alpha.kubernetes.io/ttl":"15","volumes.kubernetes.io/controller-managed-attach-detach":"true"}},"spec":{"providerID":"aws:///eu-west-2a/i-0a"},"status":{"capacity":{"attachable-volumes-aws-ebs":"39","cpu":"8","ephemeral-storage":"104845292Ki","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"32939304Ki","pods":"58"},"allocatable":{"attachable-volumes-aws-ebs":"39","cpu":"8","ephemeral-storage":"96625420948","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"32836904Ki","pods":"58"},"conditions":[{"type":"MemoryPressure","status":"False","lastHeartbeatTime":"2019-11-23T03:46:43Z","lastTransitionTime":"2019-10-31T14:06:02Z","reason":"KubeletHasSufficientMemory","message":"kubelet has sufficient memory available"},{"type":"DiskPressure","status":"False","lastHeartbeatTime":"2019-11-23T03:46:43Z","lastTransitionTime":"2019-11-18T16:22:13Z","reason":"KubeletHasNoDiskPressure","message":"kubelet has no disk pressure"},{"type":"PIDPressure","status":"False","lastHeartbeatTime":"2019-11-23T03:46:43Z","lastTransitionTime":"2019-10-31T14:06:02Z","reason":"KubeletHasSufficientPID","message":"kubelet has sufficient PID available"},{"type":"Ready","status":"True","lastHeartbeatTime":"2019-11-23T03:46:43Z","lastTransitionTime":"2019-10-31T14:06:33Z","reason":"KubeletReady","message":"kubelet is posting ready status"}],"addresses":[{"type":"InternalIP","address":"10.100.20.XX"},{"type":"Hostname","address":"ip-10-100-20-XX.eu-west-2.compute.internal"},{"type":"InternalDNS","address":"ip-10-100-20-XX.eu-west-2.compute.internal"}],"daemonEndpoints":{"kubeletEndpoint":{"Port":10250}},"nodeInfo":{"machineID":"d3b4a9efe","systemUUID":"EC236-FFCBBEBBF528","bootID":"13f2134","kernelVersion":"4.14.146-120.181.amzn2.x86_64","osImage":"Amazon Linux 2","containerRuntimeVersion":"docker://18.6.1","kubeletVersion":"v1.13.11-eks-5876d6","kubeProxyVersion":"v1.13.11-eks-5876d6","operatingSystem":"linux","architecture":"amd64"},"images":[{"names":["quay.io/clux/blah@sha256:c52fb83c5ec98b860045b8a7263fb2fa18816c9cc5881a60b19c81efff25f554","quay.io/clux/blah:d077e73bb61d02c0e8834c99c816bb69fc4ada82"],"sizeBytes":1703357563},{"names":["quay.io/clux/blah@sha256:8b26f68b4202fcf82ea38042fc46f1d79c3fdbd96ebc9abbd186d3116d4aec07","quay.io/clux/blah:ac1491a09abac827de7d073e8b2c5e76a1622a58"],"sizeBytes":1702357767},{"names":["quay.io/clux/blah@sha256:82a1e4076a2178d36de8af9497ef2d0682dc7a31a5d113e17e14d585e5637f09","quay.io/clux/blah:99e9c711e5dzzz24babd9aaacf540ad7ab5a3d8a"],"sizeBytes":1702355496},{"names":["quay.io Error("EOF while parsing a string", line: 1, column: 3844)
Error: Error deserializing response

Caused by:
    EOF while parsing a string at line 1 column 3844

@clux
Copy link
Member

clux commented Nov 23, 2019

Been stress testing the pod_informer on a busy namespace and upping the default Informer timeout to 5 minutes (more standard it looks like by inspecting kubernetes code)

diff --git a/src/api/informer.rs b/src/api/informer.rs
index fe952786..7f0ead47 100644
--- a/src/api/informer.rs
+++ b/src/api/informer.rs
@@ -44,7 +44,7 @@ where
         Informer {
             client: r.client,
             resource: r.api,
-            params: ListParams::default(),
+            params: ListParams { timeout: Some(60*5), ..ListParams::default() },
             events: Arc::new(RwLock::new(VecDeque::new())),
             version: Arc::new(RwLock::new(0.to_string())),
         }

It seems to be running super smoothly!

@clux
Copy link
Member

clux commented Nov 23, 2019

    {"type":"MODIFIED","object":{"kind":"Node","apiVersion":"v1","metadata":{"name":"ip-10-100-79-XX.eu-west-2.compute.internal","selfLink":"/api/v1/nodes/ip-10-100-79-XX.eu-west-2.compute.internal","uid":"60246186-0610-11ea-86ea-0a746bc42d00","resourceVersion":"204270168","creationTimestamp":"2019-11-13T12:23:19Z","labels":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/instance-type":"m4.2xlarge","beta.kubernetes.io/os":"linux","failure-domain.beta.kubernetes.io/region":"eu-west-2","failure-domain.beta.kubernetes.io/zone":"eu-west-2b","kubernetes.io/hostname":"ip-10-100-79-XX.eu-west-2.compute.internal","kubernetes.io/lifecycle":"spot"},"annotations":{"node.alpha.kubernetes.io/ttl":"15","volumes.kubernetes.io/controller-managed-attach-detach":"true"}},"spec":{"providerID":"aws:///eu-west-2b/i-053b88bf5db272bf6"},"status":{"capacity":{"attachable-volumes-aws-ebs":"39","cpu":"8","ephemeral-storage":"104845292Ki","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"32939304Ki","pods":"58"},"allocatable":{"attachable-volumes-aws-ebs":"39","cpu":"8","ephemeral-storage":"96625420948","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"32836904Ki","pods":"58"},"conditions":[{"type":"MemoryPressure","status":"False","lastHeartbeatTime":"2019-11-23T22:18:29Z","lastTransitionTime":"2019-11-13T12:23:18Z","reason":"KubeletHasSufficientMemory","message":"kubelet has sufficient memory available"},{"type":"DiskPressure","status":"False","lastHeartbeatTime":"2019-11-23T22:18:29Z","lastTransitionTime":"2019-11-15T10:50:14Z","reason":"KubeletHasNoDiskPressure","message":"kubelet has no disk pressure"},{"type":"PIDPressure","status":"False","lastHeartbeatTime":"2019-11-23T22:18:29Z","lastTransitionTime":"2019-11-13T12:23:18Z","reason":"KubeletHasSufficientPID","message":"kubelet has sufficient PID available"},{"type":"Ready","status":"True","lastHeartbeatTime":"2019-11-23T22:18:29Z","lastTransitionTime":"2019-11-13T12:23:39Z","reason":"KubeletReady","message":"kubelet is posting ready status"}],"addresses":[{"type":"InternalIP","address":"10.100.79.XX"},{"type":"Hostname","address":"ip-10-100-79-XX.eu-west-2.compute.internal"},{"type":"InternalDNS","address":"ip-10-100-79-XX.eu-west-2.compute.internal"}],"daemonEndpoints":{"kubeletEndpoint":{"Port":10250}},"nodeInfo":{"machineID":"88320e99c2164094a18a5394961d33b5","systemUUID":"EC24C590-C896-19D7-6ADC-2CAC012D19CB","bootID":"9794d1ce-bfba-4376-b621-ef2d80683e8d","kernelVersion":"4.14.146-120.181.amzn2.x86_64","osImage":"Amazon Linux 2","containerRuntimeVersion":"docker://18.9.9","kubeletVersion":"v1.13.11-eks-5876d6","kubeProxyVersion":"v1.13.11-eks-5876d6","operatingSystem":"linux","architecture":"amd64"},"images":[]}}}
     Error("trailing characters", line: 2, column: 1)
Error: Error deserializing response

Caused by:
    trailing characters at line 2 column 1

This MIGHT be fixed by a trim, because it looks like we have 3 leading whitespace characters for some reason. It might also be because buf.lines() is smarter than just splitting on \n.

Copy link
Author

@kitfre kitfre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Been stress testing the pod_informer on a busy namespace and upping the default Informer timeout to 5 minutes (more standard it looks like by inspecting kubernetes code)

diff --git a/src/api/informer.rs b/src/api/informer.rs
index fe952786..7f0ead47 100644
--- a/src/api/informer.rs
+++ b/src/api/informer.rs
@@ -44,7 +44,7 @@ where
         Informer {
             client: r.client,
             resource: r.api,
-            params: ListParams::default(),
+            params: ListParams { timeout: Some(60*5), ..ListParams::default() },
             events: Arc::new(RwLock::new(VecDeque::new())),
             version: Arc::new(RwLock::new(0.to_string())),
         }

It seems to be running super smoothly!

Do you want me to include this change in the PR?

src/client/mod.rs Outdated Show resolved Hide resolved
@clux
Copy link
Member

clux commented Nov 25, 2019

Been stress testing the pod_informer on a busy namespace and upping the default Informer timeout to 5 minutes (more standard it looks like by inspecting kubernetes code)

diff --git a/src/api/informer.rs b/src/api/informer.rs
index fe952786..7f0ead47 100644
--- a/src/api/informer.rs
+++ b/src/api/informer.rs
@@ -44,7 +44,7 @@ where
         Informer {
             client: r.client,
             resource: r.api,
-            params: ListParams::default(),
+            params: ListParams { timeout: Some(60*5), ..ListParams::default() },
             events: Arc::new(RwLock::new(VecDeque::new())),
             version: Arc::new(RwLock::new(0.to_string())),
         }

It seems to be running super smoothly!

Do you want me to include this change in the PR?

Don't worry about that, I need to find some sensible numbers across the crate anyway.

Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments

src/client/mod.rs Outdated Show resolved Hide resolved
src/client/mod.rs Outdated Show resolved Hide resolved
src/client/mod.rs Outdated Show resolved Hide resolved
@clux
Copy link
Member

clux commented Nov 25, 2019

Have been testing the Infomer setup a bit, and have noticed one regression; we no longer handle automatic-rewatching. A 410 error as a WatchEvent is now just bubbled up to the app, and the app can't do anything about it because it doesn't have the resourceVersion. We solved this before in Infomer::poll in the error case, but that's not covering the WatchEvent::Error.

We need something like this:

diff --git a/src/api/informer.rs b/src/api/informer.rs
index fe952786..10cea2da 100644
--- a/src/api/informer.rs
+++ b/src/api/informer.rs
@@ -149,7 +149,10 @@ where
                         Ok(WatchEvent::Added(o))
                         | Ok(WatchEvent::Modified(o))
                         | Ok(WatchEvent::Deleted(o)) => o.meta().resourceVersion.clone(),
-                        _ => None,
+                        Ok(WatchEvent::Error(e) => {
+                            // TODO: trigger self.reset() here somehow
+                            // annoying because it requires await at this point
+                        },
                     };
 
                     // Update our version need be

src/api/informer.rs Outdated Show resolved Hide resolved
Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more suggestions, but it doesn't seem to crash anymore.

src/client/mod.rs Show resolved Hide resolved
src/client/mod.rs Outdated Show resolved Hide resolved
@clux
Copy link
Member

clux commented Nov 28, 2019

The only real thing remaining here is seeing if we can do anything about automatic rewatching: #92 (comment). Do you want to have a look at that or shall I investigate?

src/api/informer.rs Outdated Show resolved Hide resolved
src/api/informer.rs Outdated Show resolved Hide resolved
Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. I'm impressed you've managed to thread this through this way. Although have left some thoughts on exposing the error event. Think we might be able to simplify the interface a bit.

src/api/informer.rs Outdated Show resolved Hide resolved
src/api/informer.rs Outdated Show resolved Hide resolved
// Sleep for 10s before allowing a retry
let dur = Duration::from_secs(10);
Delay::new(dur).await;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can avoid this convoluted structure and do this at the beginning of the Informer::poll if needs_resync is true?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a lot simpler, thanks! I've updated the code as such. Now when we run into an error the needs resetting, we'll mark that needs_resync flag, and then on the next call to poll we will wait a bit and reset our resource version. I appreciate all the code review, I know it's a lot of work but I've definitely learned a lot in working on this 🔥 🦀

@clux
Copy link
Member

clux commented Nov 29, 2019

Here's a small change to at least handle the 410 Gone correctly:

diff --git a/examples/pod_informer.rs b/examples/pod_informer.rs
index 7da0fcdb..a660b63b 100644
--- a/examples/pod_informer.rs
+++ b/examples/pod_informer.rs
@@ -19,7 +19,7 @@ async fn main() -> anyhow::Result<()> {
     let namespace = env::var("NAMESPACE").unwrap_or("default".into());
 
     let resource = Api::v1Pod(client.clone()).within(&namespace);
-    let inf = Informer::new(resource.clone()).init().await?;
+    let inf = Informer::new(resource.clone()).timeout(300).init_from("212710302".into());
 
     // Here we both poll and reconcile based on events from the main thread
     // If you run this next to actix-web (say), spawn a thread and pass `inf` as app state
diff --git a/src/api/informer.rs b/src/api/informer.rs
index de4dfd09..e0417b4d 100644
--- a/src/api/informer.rs
+++ b/src/api/informer.rs
@@ -164,18 +164,24 @@ where
                         Ok(WatchEvent::Added(o))
                         | Ok(WatchEvent::Modified(o))
                         | Ok(WatchEvent::Deleted(o)) => o.meta().resourceVersion.clone(),
-                        _ => None,
+                        Ok(WatchEvent::Error(e)) => {
+                            if e.code == 410 {
+                                warn!("Stream desynced: {:?}", e);
+                                *needs_resync.write().unwrap() = true; // resync
+                            }
+                            None
+                        },
+                        Err(e) => {
+                            warn!("Unexpected watch error: {:?}", e);
+                            None
+                        },
                     };
 
-                    // If we hit an error, mark that we need to resync on the next call
-                    if let Err(e) = &event {
-                        warn!("Poll error: {:?}", e);
-                        *needs_resync.write().unwrap() = true;
-                    }
                     // Update our version need be
                     // Follow docs conventions and store the last resourceVersion
                     // https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
-                    else if let Some(nv) = new_version {
+                    if let Some(nv) = new_version {
                         *version.write().unwrap() = nv;
                     }

(the pod_informer example is only there for me to test it)

@clux
Copy link
Member

clux commented Nov 29, 2019

I think we are good now? Everything appears to be working, no big shortcomings with the interface AFAIKT.

It's possible we can make reflector examples a little better by spawning poll in an async task so we can bump our watch timeout to 300s globally, but this needs to be released really :-)

You happy to merge?

@kitfre
Copy link
Author

kitfre commented Nov 29, 2019

You happy to merge?

Sounds good to me 🎉

@clux
Copy link
Member

clux commented Nov 29, 2019

Ok, I will prepare a release!

But first; thank you so much for all your help with this! This is probably the most significant contribution I have received in the open source world, so I really appreciate it.

@clux clux merged commit bfaec46 into kube-rs:master Nov 29, 2019
@clux
Copy link
Member

clux commented Nov 29, 2019

Published in [email protected] 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants