Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove kube::Service #532

Merged
merged 2 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 126 additions & 13 deletions kube/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,35 @@
//! interaction with the kuberneres API.
pub mod discovery;

use crate::{api::WatchEvent, config::Config, error::ErrorResponse, service::Service, Error, Result};

#[cfg(feature = "ws")]
use tokio_tungstenite::{tungstenite as ws, WebSocketStream};
use std::convert::{TryFrom, TryInto};

use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, Stream, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response, StatusCode};
use http::{self, HeaderValue, Request, Response, StatusCode};
use hyper::Body;
use hyper_timeout::TimeoutConnector;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
use kube_core::response::Status;
use serde::de::DeserializeOwned;
use serde_json::{self, Value};
#[cfg(feature = "ws")]
use tokio_tungstenite::{tungstenite as ws, WebSocketStream};
use tokio_util::{
codec::{FramedRead, LinesCodec, LinesCodecError},
io::StreamReader,
};
use tower::{Service as _, ServiceExt};
use tower::{buffer::Buffer, util::BoxService, BoxError, Service, ServiceBuilder, ServiceExt};

use std::convert::{TryFrom, TryInto};

#[cfg(feature = "gzip")]
use crate::service::{accept_compressed, maybe_decompress};
use crate::{
api::WatchEvent,
error::{ConfigError, ErrorResponse},
service::{set_cluster_url, set_default_headers, AuthLayer, Authentication, HttpsConnector, LogRequest},
Config, Error, Result,
};

// Binary subprotocol v4. See `Client::connect`.
#[cfg(feature = "ws")]
Expand All @@ -41,15 +49,23 @@ const WS_PROTOCOL: &str = "v4.channel.k8s.io";
/// using [`Client::try_from`].
#[derive(Clone)]
pub struct Client {
inner: Service,
// - `Buffer` for cheap clone
// - `BoxService` for dynamic response future type
inner: Buffer<BoxService<Request<Body>, Response<Body>, BoxError>, Request<Body>>,
}

impl Client {
/// Create and initialize a [`Client`] using the given `Service`.
///
/// Use [`Client::try_from`](Self::try_from) to create with a [`Config`].
pub fn new(service: Service) -> Self {
Self { inner: service }
pub fn new<S>(service: S) -> Self
where
S: Service<Request<Body>, Response = Response<Body>, Error = BoxError> + Send + 'static,
S::Future: Send + 'static,
{
Self {
inner: Buffer::new(BoxService::new(service), 1024),
clux marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Create and initialize a [`Client`] using the inferred
Expand All @@ -63,8 +79,7 @@ impl Client {
/// If you already have a [`Config`] then use [`Client::try_from`](Self::try_from)
/// instead.
pub async fn try_default() -> Result<Self> {
let client_config = Config::infer().await?;
Self::try_from(client_config)
Self::try_from(Config::infer().await?)
}

async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
Expand Down Expand Up @@ -348,7 +363,60 @@ impl TryFrom<Config> for Client {

/// Convert [`Config`] into a [`Client`]
fn try_from(config: Config) -> Result<Self> {
Ok(Self::new(config.try_into()?))
let cluster_url = config.cluster_url.clone();
let mut default_headers = config.headers.clone();
let timeout = config.timeout;

// AuthLayer is not necessary unless `RefreshableToken`
let maybe_auth = match Authentication::try_from(&config.auth_info)? {
Authentication::None => None,
Authentication::Basic(s) => {
let mut value =
HeaderValue::try_from(format!("Basic {}", &s)).map_err(ConfigError::InvalidBasicAuth)?;
value.set_sensitive(true);
default_headers.insert(http::header::AUTHORIZATION, value);
None
}
Authentication::Token(s) => {
let mut value = HeaderValue::try_from(format!("Bearer {}", &s))
.map_err(ConfigError::InvalidBearerToken)?;
value.set_sensitive(true);
default_headers.insert(http::header::AUTHORIZATION, value);
None
}
Authentication::RefreshableToken(r) => Some(AuthLayer::new(r)),
};

let common = ServiceBuilder::new()
.map_request(move |r| set_cluster_url(r, &cluster_url))
.map_request(move |r| set_default_headers(r, default_headers.clone()))
.into_inner();

#[cfg(feature = "gzip")]
let common = ServiceBuilder::new()
.layer(common)
.map_request(accept_compressed)
.map_response(maybe_decompress)
.into_inner();

let https: HttpsConnector<_> = config.try_into()?;
let mut connector = TimeoutConnector::new(https);
if let Some(timeout) = timeout {
// reqwest's timeout is applied from when the request stars connecting until
// the response body has finished.
// Setting both connect and read timeout should be close enough.
connector.set_connect_timeout(Some(timeout));
// Timeout for reading the response.
connector.set_read_timeout(Some(timeout));
}
let client: hyper::Client<_, Body> = hyper::Client::builder().build(connector);

let inner = ServiceBuilder::new()
.layer(common)
.option_layer(maybe_auth)
.layer(tower::layer::layer_fn(LogRequest::new))
.service(client);
Ok(Self::new(inner))
}
}

Expand Down Expand Up @@ -407,3 +475,48 @@ fn sec_websocket_key() -> String {
let r: [u8; 16] = rand::random();
base64::encode(&r)
}

#[cfg(test)]
mod tests {
use crate::{Api, Client};

use futures::pin_mut;
use http::{Request, Response};
use hyper::Body;
use k8s_openapi::api::core::v1::Pod;
use tower_test::mock;

#[tokio::test]
async fn test_mock() {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let spawned = tokio::spawn(async move {
// Receive a request for pod and respond with some data
pin_mut!(handle);
let (request, send) = handle.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::GET);
assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
let pod: Pod = serde_json::from_value(serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "test",
"annotations": { "kube-rs": "test" },
},
"spec": {
"containers": [{ "name": "test", "image": "test-image" }],
}
}))
.unwrap();
send.send_response(
Response::builder()
.body(Body::from(serde_json::to_vec(&pod).unwrap()))
.unwrap(),
);
});

let pods: Api<Pod> = Api::namespaced(Client::new(mock_service), "default");
let pod = pods.get("test").await.unwrap();
assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
spawned.await.unwrap();
clux marked this conversation as resolved.
Show resolved Hide resolved
}
}
4 changes: 1 addition & 3 deletions kube/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ cfg_client! {
pub mod api;
pub mod client;
pub mod config;
pub mod service;
pub(crate) mod service;

pub mod error;

Expand All @@ -117,8 +117,6 @@ cfg_client! {
#[doc(inline)]
pub use config::Config;
#[doc(inline)] pub use error::Error;
#[doc(inline)]
pub use service::Service;

/// Convient alias for `Result<T, Error>`
pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down
125 changes: 9 additions & 116 deletions kube/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,119 +7,12 @@ mod log;
mod tls;
mod url;

use self::{log::LogRequest, url::set_cluster_url};
use auth::AuthLayer;
#[cfg(feature = "gzip")] use compression::{accept_compressed, maybe_decompress};
use headers::set_default_headers;
use http::{HeaderValue, Request, Response};
use hyper::{Body, Client as HyperClient};
use hyper_timeout::TimeoutConnector;
use std::convert::{TryFrom, TryInto};
use tls::HttpsConnector;

use tower::{buffer::Buffer, util::BoxService, BoxError, ServiceBuilder};

use crate::{error::ConfigError, Config, Error, Result};
use auth::Authentication;

// - `Buffer` for cheap clone
// - `BoxService` to avoid type parameters in `Client`
type InnerService = Buffer<BoxService<Request<Body>, Response<Body>, BoxError>, Request<Body>>;

#[derive(Clone)]
/// Abstracts how [`Client`](crate::Client) communicates with the Kubernetes API server.
///
/// The default service uses [`hyper::Client`] and can be created from [`Config`](crate::Config)
/// with [`Service::try_from`].
pub struct Service {
inner: InnerService,
}

impl Service {
/// Create a custom `Service`.
pub fn new<S>(inner: S) -> Self
where
S: tower::Service<Request<Body>, Response = Response<Body>, Error = BoxError> + Send + 'static,
S::Future: Send + 'static,
{
Self {
inner: Buffer::new(BoxService::new(inner), 1024),
}
}
}

impl tower::Service<Request<Body>> for Service {
type Error = <InnerService as tower::Service<Request<Body>>>::Error;
type Future = <InnerService as tower::Service<Request<Body>>>::Future;
type Response = <InnerService as tower::Service<Request<Body>>>::Response;

fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
self.inner.call(req)
}
}

impl TryFrom<Config> for Service {
type Error = Error;

/// Convert [`Config`] into a [`Service`]
fn try_from(config: Config) -> Result<Self> {
let cluster_url = config.cluster_url.clone();
let mut default_headers = config.headers.clone();
let timeout = config.timeout;

// AuthLayer is not necessary unless `RefreshableToken`
let maybe_auth = match Authentication::try_from(&config.auth_info)? {
Authentication::None => None,
Authentication::Basic(s) => {
let mut value =
HeaderValue::try_from(format!("Basic {}", &s)).map_err(ConfigError::InvalidBasicAuth)?;
value.set_sensitive(true);
default_headers.insert(http::header::AUTHORIZATION, value);
None
}
Authentication::Token(s) => {
let mut value = HeaderValue::try_from(format!("Bearer {}", &s))
.map_err(ConfigError::InvalidBearerToken)?;
value.set_sensitive(true);
default_headers.insert(http::header::AUTHORIZATION, value);
None
}
Authentication::RefreshableToken(r) => Some(AuthLayer::new(r)),
};

let common = ServiceBuilder::new()
.map_request(move |r| set_cluster_url(r, &cluster_url))
.map_request(move |r| set_default_headers(r, default_headers.clone()))
.into_inner();

#[cfg(feature = "gzip")]
let common = ServiceBuilder::new()
.layer(common)
.map_request(accept_compressed)
.map_response(maybe_decompress)
.into_inner();

let https: HttpsConnector<_> = config.try_into()?;
let mut connector = TimeoutConnector::new(https);
if let Some(timeout) = timeout {
// reqwest's timeout is applied from when the request stars connecting until
// the response body has finished.
// Setting both connect and read timeout should be close enough.
connector.set_connect_timeout(Some(timeout));
// Timeout for reading the response.
connector.set_read_timeout(Some(timeout));
}
let client: HyperClient<_, Body> = HyperClient::builder().build(connector);

let inner = ServiceBuilder::new()
.layer(common)
.option_layer(maybe_auth)
.layer(tower::layer::layer_fn(LogRequest::new))
.service(client);
Ok(Self::new(inner))
}
}
#[cfg(feature = "gzip")]
pub(crate) use self::compression::{accept_compressed, maybe_decompress};
pub(crate) use self::{
auth::{AuthLayer, Authentication},
headers::set_default_headers,
log::LogRequest,
tls::HttpsConnector,
url::set_cluster_url,
};