Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WebSocket connection support and implement /attach and /exec #360

Merged
merged 24 commits into from
Jan 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1fb423b
Decouple Config from reqwest
kazk Dec 26, 2020
5a1b4df
Add `client.connect(Request)` to make WebSocket connection
kazk Dec 28, 2020
557c125
Implement outputs from Pod `/attach`
kazk Dec 28, 2020
be111c2
Set `sec-websocket-protocol` header
kazk Dec 30, 2020
8354966
Add `AttachedProcess` for attach and exec
kazk Dec 30, 2020
5dfff99
Add `/exec` subresource
kazk Dec 30, 2020
d4eb679
Rename AttachingObject to AttachableObject
kazk Dec 31, 2020
ee45eda
Fix confusing examples
kazk Dec 31, 2020
3028931
Add builder methods for Attach/Exec Params
kazk Dec 31, 2020
220773e
Include links to code describing the subprotocol
kazk Dec 31, 2020
19b11fa
Move code for AsyncTlsConnector into a separate module
kazk Dec 31, 2020
75101a5
Add validation for Attach/Exec Params
kazk Dec 31, 2020
6ddcb3d
Remove ExecParams
kazk Dec 31, 2020
2225dca
Rename api/streaming to api/remote_command
kazk Dec 31, 2020
3e98246
Improve docs for AttachParams and AttachedProcess
kazk Dec 31, 2020
62bbac0
Add tests for Resource::attach/exec
kazk Dec 31, 2020
43521c3
Refactor message loop
kazk Jan 1, 2021
a1fff2d
Remove unnecessary Error variants
kazk Jan 1, 2021
56c3336
Make internal max buffer size configurable
kazk Jan 1, 2021
2ca61aa
Add myself to the authors list
kazk Jan 1, 2021
d12e6eb
Require tokio v0.2.24
kazk Jan 1, 2021
2239d34
Improve WebSocket connection error handling
kazk Jan 1, 2021
698321a
Change `AttachedProcess` `stdout()`/`stderr()` to return readers
kazk Jan 1, 2021
ffa0c57
Add pod_shell example
kazk Jan 2, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ version = "0.1.0"
authors = [
"clux <[email protected]>",
"Teo Klestrup Röijezon <[email protected]>",
"kazk <[email protected]>",
]
publish = false
edition = "2018"

[features]
default = ["native-tls", "schema", "kubederive"]
default = ["native-tls", "schema", "kubederive", "ws"]
kubederive = ["kube/derive"] # by default import kube-derive with its default features
schema = ["kube-derive/schema"] # crd_derive_no_schema shows how to opt out
native-tls = ["reqwest/native-tls", "kube/native-tls", "kube-runtime/native-tls"]
rustls-tls = ["reqwest/rustls-tls", "kube/rustls-tls", "kube-runtime/rustls-tls"]
ws = ["kube/ws", "kube/ws-native-tls"]

[dev-dependencies]
anyhow = "1.0.32"
Expand All @@ -27,7 +29,7 @@ log = "0.4.11"
serde = { version = "1.0.111", features = ["derive"] }
serde_json = "1.0.57"
serde_yaml = "0.8.14"
tokio = { version = "0.2.22", features = ["full"] }
tokio = { version = "0.2.24", features = ["full"] }
color-eyre = "0.5.1"
snafu = { version = "0.6.8", features = ["futures"] }
# Some Api::delete methods use Either
Expand Down Expand Up @@ -101,6 +103,18 @@ path = "multi_watcher.rs"
name = "pod_api"
path = "pod_api.rs"

[[example]]
name = "pod_attach"
path = "pod_attach.rs"

[[example]]
name = "pod_exec"
path = "pod_exec.rs"

[[example]]
name = "pod_shell"
path = "pod_shell.rs"

[[example]]
name = "proxy"
path = "proxy.rs"
Expand Down
115 changes: 115 additions & 0 deletions examples/pod_attach.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#[macro_use]
extern crate log;

use std::io::Write;

use futures::{join, stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;

use kube::{
api::{Api, AttachParams, AttachedProcess, DeleteParams, ListParams, Meta, PostParams, WatchEvent},
Client,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());

info!("Creating a Pod that outputs numbers for 15s");
let p: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "example" },
"spec": {
"containers": [{
"name": "example",
"image": "alpine",
"command": ["sh", "-c", "for i in `seq 15`; do if [ $i -lt 7 ]; then echo \"o $i\"; else echo \"e $i\" 1>&2; fi; sleep 1; done;"],
}],
}
}))?;

let pods: Api<Pod> = Api::namespaced(client, &namespace);
// Stop on error including a pod already exists or is still being deleted.
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let lp = ListParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => {
info!("Added {}", Meta::name(&o));
}
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
if s.phase.clone().unwrap_or_default() == "Running" {
info!("Ready to attach to {}", Meta::name(&o));
break;
}
}
_ => {}
}
}

let ap = AttachParams::default();
// Attach to see numbers printed on stdout.
let attached = pods.attach("example", &ap).await?;
// Separate stdout/stderr outputs
separate_outputs(attached).await;
// Combining stdout and stderr output.
// combined_output(proc).await;

// Delete it
pods.delete("example", &DeleteParams::default())
.await?
.map_left(|pdel| {
assert_eq!(Meta::name(&pdel), "example");
});

Ok(())
}

#[allow(dead_code)]
async fn separate_outputs(mut attached: AttachedProcess) {
let stdout = tokio::io::reader_stream(attached.stdout().unwrap());
let stdouts = stdout.for_each(|res| async {
if let Ok(bytes) = res {
let out = std::io::stdout();
out.lock().write_all(&bytes).unwrap();
}
});
let stderr = tokio::io::reader_stream(attached.stderr().unwrap());
let stderrs = stderr.for_each(|res| async {
if let Ok(bytes) = res {
let out = std::io::stderr();
out.lock().write_all(&bytes).unwrap();
}
});

join!(stdouts, stderrs);

if let Some(status) = attached.await {
println!("{:?}", status);
}
}

#[allow(dead_code)]
async fn combined_output(mut attached: AttachedProcess) {
let stdout = tokio::io::reader_stream(attached.stdout().unwrap());
let stderr = tokio::io::reader_stream(attached.stderr().unwrap());
let outputs = stream::select(stdout, stderr).for_each(|res| async {
if let Ok(bytes) = res {
let out = std::io::stdout();
out.lock().write_all(&bytes).unwrap();
}
});
outputs.await;

if let Some(status) = attached.await {
println!("{:?}", status);
}
}
126 changes: 126 additions & 0 deletions examples/pod_exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#[macro_use]
extern crate log;

use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;

use kube::{
api::{Api, AttachParams, AttachedProcess, DeleteParams, ListParams, Meta, PostParams, WatchEvent},
Client,
};
use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());

let p: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "example" },
"spec": {
"containers": [{
"name": "example",
"image": "alpine",
// Do nothing
"command": ["tail", "-f", "/dev/null"],
}],
}
}))?;

let pods: Api<Pod> = Api::namespaced(client, &namespace);
// Stop on error including a pod already exists or is still being deleted.
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let lp = ListParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => {
info!("Added {}", Meta::name(&o));
}
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
if s.phase.clone().unwrap_or_default() == "Running" {
info!("Ready to attach to {}", Meta::name(&o));
break;
}
}
_ => {}
}
}

// These examples are mostly taken from Python client's integration tests.
{
let attached = pods
.exec(
"example",
vec!["sh", "-c", "for i in $(seq 1 3); do date; done"],
&AttachParams::default().stderr(false),
)
.await?;
let output = get_output(attached).await;
println!("{}", output);
assert_eq!(output.lines().count(), 3);
}

{
let attached = pods
.exec("example", vec!["uptime"], &AttachParams::default().stderr(false))
.await?;
let output = get_output(attached).await;
println!("{}", output);
assert_eq!(output.lines().count(), 1);
}

// Stdin example
{
let mut attached = pods
.exec(
"example",
vec!["sh"],
&AttachParams::default().stdin(true).stderr(false),
)
.await?;
let mut stdin_writer = attached.stdin().unwrap();
let mut stdout_stream = tokio::io::reader_stream(attached.stdout().unwrap());
let next_stdout = stdout_stream.next();
stdin_writer.write(b"echo test string 1\n").await?;
let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
println!("{}", stdout);
assert_eq!(stdout, "test string 1\n");

// AttachedProcess resolves with status object.
// Send `exit 1` to get a failure status.
stdin_writer.write(b"exit 1\n").await?;
if let Some(status) = attached.await {
println!("{:?}", status);
assert_eq!(status.status, Some("Failure".to_owned()));
assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
}
}

// Delete it
pods.delete("example", &DeleteParams::default())
.await?
.map_left(|pdel| {
assert_eq!(Meta::name(&pdel), "example");
});

Ok(())
}

async fn get_output(mut attached: AttachedProcess) -> String {
let stdout = tokio::io::reader_stream(attached.stdout().unwrap());
let out = stdout
.filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
.collect::<Vec<_>>()
.await
.join("");
attached.await;
out
}
97 changes: 97 additions & 0 deletions examples/pod_shell.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#[macro_use]
extern crate log;

use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Pod;

use kube::{
api::{Api, AttachParams, DeleteParams, ListParams, Meta, PostParams, WatchEvent},
Client,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());

let p: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "example" },
"spec": {
"containers": [{
"name": "example",
"image": "alpine",
// Do nothing
"command": ["tail", "-f", "/dev/null"],
}],
}
}))?;

let pods: Api<Pod> = Api::namespaced(client, &namespace);
// Stop on error including a pod already exists or is still being deleted.
pods.create(&PostParams::default(), &p).await?;

// Wait until the pod is running, otherwise we get 500 error.
let lp = ListParams::default().fields("metadata.name=example").timeout(10);
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(status) = stream.try_next().await? {
match status {
WatchEvent::Added(o) => {
info!("Added {}", Meta::name(&o));
}
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
if s.phase.clone().unwrap_or_default() == "Running" {
info!("Ready to attach to {}", Meta::name(&o));
break;
}
}
_ => {}
}
}

// Piping current stdin/stdout
{
let mut attached = pods
.exec(
"example",
vec!["sh"],
&AttachParams::default()
.stdin(true)
.stdout(true)
.stderr(false)
.tty(true),
)
.await?;
let mut stdin_writer = attached.stdin().unwrap();
let mut stdout_reader = attached.stdout().unwrap();
// > For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.
// > https://docs.rs/tokio/0.2.24/tokio/io/fn.stdin.html
let mut stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
// pipe current stdin to the stdin writer from ws
tokio::spawn(async move {
tokio::io::copy(&mut stdin, &mut stdin_writer).await;
});
// pipe stdout from ws to current stdout
tokio::spawn(async move {
tokio::io::copy(&mut stdout_reader, &mut stdout).await;
});
// When done, type `exit\n` to end it, so the pod is deleted.
let status = attached.await;
println!("{:?}", status);
}

// Delete it
println!("deleting");
pods.delete("example", &DeleteParams::default())
.await?
.map_left(|pdel| {
assert_eq!(Meta::name(&pdel), "example");
});

Ok(())
}
Loading