Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Client configurable #540

Merged
merged 36 commits into from
Jun 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
37ad7f0
Expose methods to configure TLS connection
kazk May 25, 2021
196503f
Depend on rustls directly instead of tokio-rustls
kazk May 25, 2021
741df23
Use rustls-pemfile instead of internal module
kazk May 26, 2021
d436112
Add custom client example
kazk May 26, 2021
a0efcfa
Remove `client` from `native-tls` and `rust-tls`
kazk May 26, 2021
cb081ae
Change Service's Error to `Into<BoxError>`
kazk May 26, 2021
93e6c65
Always use `http::Uri` instead of `url::Url`
kazk May 27, 2021
7eb8697
Add `SetBaseUriLayer` to set base URI of requests
kazk May 27, 2021
50634fd
Remove hyper::Body dependency from services
kazk Jun 1, 2021
0d9e8d0
Relax Service's response body type
kazk Jun 1, 2021
6649d97
Replace custom decompression module with `DecompressionLayer`
kazk Jun 1, 2021
992231d
Replace `LogRequest` layer with `TraceLayer`
kazk Jun 1, 2021
977ba63
Split custom client examples
kazk Jun 2, 2021
36d39fd
Switch to custom tracing with callbacks
kazk Jun 2, 2021
ef0eae0
Improve default tracing
kazk Jun 3, 2021
a3a1dc7
Simplify custom client examples
kazk Jun 3, 2021
7e0c49b
Fix examples having both TLS enabled by default
kazk Jun 3, 2021
891cef1
Clean up body transformer
kazk Jun 3, 2021
055eceb
Replace `set_default_headers` with `SetHeadersLayer`
kazk Jun 3, 2021
25ddfe0
Refactor auth with layers
kazk Jun 3, 2021
a5bb2bc
Add methods to create HttpsConnector directly from Config
kazk Jun 3, 2021
b8ba3bc
Move layers under client
kazk Jun 3, 2021
5e7fc1c
Remove `_tls` suffix from rustls methods
kazk Jun 3, 2021
8536b2a
Pass span name through Extensions and remove instrument in Api
kazk Jun 4, 2021
583cdbb
Refactor the default service stack
kazk Jun 4, 2021
54baa61
Rename Authentication to Auth
kazk Jun 4, 2021
0514408
Add `client::ConfigExt` to extend `Config` for `Client`
kazk Jun 4, 2021
b6f939d
Add `ConfigExt::base_uri_layer`
kazk Jun 4, 2021
2258750
Rename to `RefreshTokenLayer`
kazk Jun 4, 2021
0f0994e
Discourage using middleware directly
kazk Jun 4, 2021
1b08927
Remove `headers` from `Config`
kazk Jun 4, 2021
f009ff6
Support loading proxy URL
kazk Jun 5, 2021
79096a9
Add `ConfigExt::auth_layer` hiding details
kazk Jun 5, 2021
1153536
Document custom client
kazk Jun 5, 2021
8c540c0
Add missing feature tags
kazk Jun 5, 2021
38782b5
Rename to `BaseUriLayer`
kazk Jun 5, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Relax Service's response body type
Allow using more from the Tower ecosystem.
  • Loading branch information
kazk committed Jun 3, 2021
commit 0d9e8d09d00cf76015b4e9a341b306b03516668a
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ warp = { version = "0.3", features = ["tls"] }
http = "0.2.3"
json-patch = "0.2.6"
tower = { version = "0.4.6" }
tower-http = { version = "0.1.0", features = ["trace", "decompression-gzip"] }
hyper = { version = "0.14.2", features = ["client", "http1", "stream", "tcp"] }

[[example]]
Expand Down
20 changes: 19 additions & 1 deletion examples/custom_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ use serde_json::json;
use tokio_native_tls::TlsConnector;
use tower::ServiceBuilder;

use tower_http::{
decompression::DecompressionLayer,
trace::{DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse, TraceLayer},
LatencyUnit,
};
use tracing::Level;

use kube::{
api::{Api, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent},
service::SetBaseUriLayer,
Expand All @@ -17,13 +24,24 @@ use kube::{

#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
std::env::set_var("RUST_LOG", "info,kube=debug,tower_http=trace");
env_logger::init();

let config = Config::infer().await?;
let cluster_url = config.cluster_url.clone();
let common = ServiceBuilder::new()
.layer(SetBaseUriLayer::new(cluster_url))
.layer(DecompressionLayer::new())
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::new().include_headers(true))
.on_request(DefaultOnRequest::new().level(Level::INFO))
.on_response(
DefaultOnResponse::new()
.level(Level::INFO)
.latency_unit(LatencyUnit::Micros),
),
)
.into_inner();
let mut http = HttpConnector::new();
http.enforce_http(false);
Expand Down
5 changes: 3 additions & 2 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls", "webpki"]
ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws"]
oauth = ["client", "tame-oauth"]
gzip = ["client", "async-compression"]
client = ["config", "__non_core", "hyper", "tower", "hyper-timeout", "pin-project", "chrono", "jsonpath_lib", "bytes", "futures", "tokio", "tokio-util", "either"]
client = ["config", "__non_core", "hyper", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath_lib", "bytes", "futures", "tokio", "tokio-util", "either"]
jsonpatch = ["kube-core/jsonpatch"]
admission = ["kube-core/admission"]
derive = ["kube-derive"]
Expand Down Expand Up @@ -60,11 +60,12 @@ kube-derive = { path = "../kube-derive", version = "^0.55.0", optional = true }
kube-core = { path = "../kube-core", version = "^0.55.0"}
jsonpath_lib = { version = "0.3.0", optional = true }
tokio-util = { version = "0.6.0", optional = true, features = ["io", "codec"] }
hyper = { version = "0.14.2", optional = true, features = ["client", "http1", "stream", "tcp"] }
hyper = { version = "0.14.8", optional = true, features = ["client", "http1", "stream", "tcp"] }
hyper-tls = { version = "0.5.0", optional = true }
hyper-rustls = { version = "0.22.1", optional = true }
tokio-tungstenite = { version = "0.14.0", optional = true }
tower = { version = "0.4.6", optional = true, features = ["buffer", "util"] }
tower-http = { version = "0.1.0", optional = true, features = ["map-response-body"] }
async-compression = { version = "0.3.7", features = ["gzip", "tokio"], optional = true }
hyper-timeout = {version = "0.4.1", optional = true }
tame-oauth = { version = "0.4.7", features = ["gcp"], optional = true }
Expand Down
32 changes: 32 additions & 0 deletions kube/src/client/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use futures::stream::Stream;
use http_body::Body;
use pin_project::pin_project;

// Wrap `http_body::Body` to implement `Stream`.
#[pin_project]
pub struct IntoStream<B> {
#[pin]
body: B,
}

impl<B> IntoStream<B> {
pub(crate) fn new(body: B) -> Self {
Self { body }
}
}

impl<B> Stream for IntoStream<B>
where
B: Body,
{
type Item = Result<B::Data, B::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().body.poll_data(cx)
}
}
25 changes: 19 additions & 6 deletions kube/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use tokio_util::{
codec::{FramedRead, LinesCodec, LinesCodecError},
io::StreamReader,
};
use tower::{buffer::Buffer, util::BoxService, BoxError, Service, ServiceBuilder, ServiceExt};
use tower::{buffer::Buffer, util::BoxService, BoxError, Layer, Service, ServiceBuilder, ServiceExt};
use tower_http::map_response_body::MapResponseBodyLayer;

#[cfg(feature = "gzip")]
use crate::service::{accept_compressed, maybe_decompress};
Expand All @@ -37,6 +38,8 @@ use crate::{
Config, Error, Result,
};

mod body;

// Binary subprotocol v4. See `Client::connect`.
#[cfg(feature = "ws")]
const WS_PROTOCOL: &str = "v4.channel.k8s.io";
Expand All @@ -59,22 +62,32 @@ impl Client {
/// Create and initialize a [`Client`] using the given `Service`.
///
/// Use [`Client::try_from`](Self::try_from) to create with a [`Config`].
pub fn new<S, E: Into<BoxError>>(service: S) -> Self
pub fn new<S, B>(service: S) -> Self
where
S: Service<Request<Body>, Response = Response<Body>, Error = E> + Send + 'static,
S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
{
Self::new_with_default_ns(service, "default")
}

/// Create and initialize a [`Client`] using the given `Service` and the default namespace.
fn new_with_default_ns<S, E: Into<BoxError>, T: Into<String>>(service: S, default_ns: T) -> Self
fn new_with_default_ns<S, B, T: Into<String>>(service: S, default_ns: T) -> Self
where
S: Service<Request<Body>, Response = Response<Body>, Error = E> + Send + 'static,
S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
{
// Transform response body to `hyper::Body` and use type erased error to avoid type parameters.
let service = MapResponseBodyLayer::new(|b| hyper::Body::wrap_stream(body::IntoStream::new(b)))
.layer(service)
.map_err(|e| e.into());
Copy link
Member Author

@kazk kazk Jun 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a few days to come up with this, but should be worth it. This change allows us to take advantage of the Tower ecosystem much more without introducing type parameters to Client.

Shouldn't be a breaking change, but need to test more to make sure.

Self {
inner: Buffer::new(BoxService::new(service.map_err(|e| e.into())), 1024),
inner: Buffer::new(BoxService::new(service), 1024),
default_ns: default_ns.into(),
}
}
Expand Down