diff --git a/kube/Cargo.toml b/kube/Cargo.toml index e6c8e4fc4..9af8e3934 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -32,7 +32,7 @@ openssl = { version = "0.10.29", optional = true } rustls = { version = "0.17.0", optional = true } bytes = "0.5.4" Inflector = "0.11.4" -tokio = { version = "0.2.17", features = ["time", "signal"] } +tokio = { version = "0.2.17", features = ["time", "signal", "sync"] } [dependencies.reqwest] version = "0.10.4" diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index eed38b748..6af43ae26 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -33,6 +33,7 @@ pub struct Client { cluster_url: reqwest::Url, default_ns: String, inner: reqwest::Client, + config: Config, } impl Client { @@ -72,6 +73,12 @@ impl Client { let uri_str = self.cluster_url.join(pandq.as_str())?; //trace!("Sending request => method = {} uri = {}", parts.method, uri_str); + let mut headers = parts.headers; + // If we have auth headers set, make sure they are updated and attached to the request + if let Some(auth_header) = self.config.get_auth_header().await? { + headers.insert(reqwest::header::AUTHORIZATION, auth_header); + } + let request = match parts.method { http::Method::GET | http::Method::POST @@ -81,7 +88,7 @@ impl Client { other => return Err(Error::InvalidMethod(other.to_string())), }; - let req = request.headers(parts.headers).body(body).build()?; + let req = request.headers(headers).body(body).build()?; let res = self.inner.execute(req).await?; Ok(res) } @@ -299,11 +306,13 @@ impl TryFrom for Client { fn try_from(config: Config) -> Result { let cluster_url = config.cluster_url.clone(); let default_ns = config.default_ns.clone(); + let config_clone = config.clone(); let builder: reqwest::ClientBuilder = config.into(); Ok(Self { cluster_url, default_ns, inner: builder.build()?, + config: config_clone, }) } } diff --git a/kube/src/config/mod.rs b/kube/src/config/mod.rs index 9c487bc31..13499c6a3 100644 --- a/kube/src/config/mod.rs +++ b/kube/src/config/mod.rs @@ -14,10 +14,26 @@ use crate::{Error, Result}; pub use file_loader::KubeConfigOptions; use file_loader::{ConfigLoader, Der}; +use chrono::{DateTime, Utc}; use reqwest::header::{self, HeaderMap}; +use tokio::sync::Mutex; +use std::sync::Arc; use std::time::Duration; +#[derive(Debug, Clone)] +pub(crate) struct AuthHeader { + value: String, + expiration: Option>, +} + +impl AuthHeader { + fn to_header(&self) -> Result { + header::HeaderValue::from_str(&self.value) + .map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e))) + } +} + /// Configuration object detailing things like cluster_url, default namespace, root certificates, and timeouts #[derive(Debug, Clone)] pub struct Config { @@ -41,6 +57,9 @@ pub struct Config { /// This is stored in a raw buffer form so that Config can implement `Clone` /// (since [`reqwest::Identity`] does not currently implement `Clone`) pub(crate) identity: Option<(Vec, String)>, + pub(crate) auth_header: Option>>, + + loader: Option, } impl Config { @@ -58,6 +77,8 @@ impl Config { timeout: DEFAULT_TIMEOUT, accept_invalid_certs: false, identity: None, + auth_header: None, + loader: None, } } @@ -101,22 +122,21 @@ impl Config { let token = incluster_config::load_token() .map_err(|e| Error::Kubeconfig(format!("Unable to load in cluster token: {}", e)))?; - - let mut headers = HeaderMap::new(); - headers.insert( - header::AUTHORIZATION, - header::HeaderValue::from_str(&format!("Bearer {}", token)) - .map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?, - ); + let token = AuthHeader { + value: format!("Bearer {}", token), + expiration: None, + }; Ok(Self { cluster_url, default_ns, root_cert: Some(root_cert), - headers, + headers: HeaderMap::new(), timeout: DEFAULT_TIMEOUT, accept_invalid_certs: false, identity: None, + auth_header: Some(Arc::new(Mutex::new(token))), + loader: None, }) } @@ -134,20 +154,7 @@ impl Config { .clone() .unwrap_or_else(|| String::from("default")); - let token = match &loader.user.token { - Some(token) => Some(token.clone()), - None => { - if let Some(exec) = &loader.user.exec { - let creds = exec::auth_exec(exec)?; - let status = creds.status.ok_or_else(|| { - Error::Kubeconfig("exec-plugin response did not contain a status".into()) - })?; - status.token - } else { - None - } - } - }; + let auth_header = load_auth_header(&loader)?; let mut accept_invalid_certs = false; let mut root_cert = None; @@ -172,41 +179,52 @@ impl Config { } } - let mut headers = HeaderMap::new(); - - match ( - utils::data_or_file(&token, &loader.user.token_file), - (&loader.user.username, &loader.user.password), - ) { - (Ok(token), _) => { - headers.insert( - header::AUTHORIZATION, - header::HeaderValue::from_str(&format!("Bearer {}", token)) - .map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?, - ); - } - (_, (Some(u), Some(p))) => { - let encoded = base64::encode(&format!("{}:{}", u, p)); - headers.insert( - header::AUTHORIZATION, - header::HeaderValue::from_str(&format!("Basic {}", encoded)) - .map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?, - ); - } - _ => {} - } - Ok(Self { cluster_url, default_ns, root_cert, - headers, + headers: HeaderMap::new(), timeout: DEFAULT_TIMEOUT, accept_invalid_certs, identity: identity.map(|i| (i, String::from(IDENTITY_PASSWORD))), + auth_header: auth_header.map(|h| Arc::new(Mutex::new(h))), + loader: Some(loader), }) } + async fn needs_refresh(&self) -> bool { + if let Some(header) = self.auth_header.as_ref() { + header + .lock() + .await + .expiration + // Add some wiggle room onto the current timestamp so we don't get any race + // conditions where the token expires while we are refreshing + .map_or(false, |ex| { + chrono::Utc::now() + chrono::Duration::seconds(60) >= ex + }) + } else { + false + } + } + + pub(crate) async fn get_auth_header(&self) -> Result> { + if self.needs_refresh().await { + if let Some(loader) = self.loader.as_ref() { + if let (Some(current_header), Some(new_header)) = + (self.auth_header.as_ref(), load_auth_header(loader)?) + { + *current_header.lock().await = new_header; + } + } + } + let header = match self.auth_header.as_ref() { + Some(h) => Some(h.lock().await.to_header()?), + None => None, + }; + Ok(header) + } + // The identity functions are used to parse the stored identity buffer // into an `reqwest::Identity` type. We do this because `reqwest::Identity` // is not `Clone`. This allows us to store and clone the buffer and supply @@ -233,6 +251,46 @@ impl Config { } } +fn load_auth_header(loader: &ConfigLoader) -> Result> { + let (raw_token, expiration) = match &loader.user.token { + Some(token) => (Some(token.clone()), None), + None => { + if let Some(exec) = &loader.user.exec { + let creds = exec::auth_exec(exec)?; + let status = creds.status.ok_or_else(|| { + Error::Kubeconfig("exec-plugin response did not contain a status".into()) + })?; + let expiration = match status.expiration_timestamp { + Some(ts) => Some(ts.parse::>().map_err(|e| { + Error::Kubeconfig(format!("Malformed expriation date on token: {}", e)) + })?), + None => None, + }; + (status.token, expiration) + } else { + (None, None) + } + } + }; + match ( + utils::data_or_file(&raw_token, &loader.user.token_file), + (&loader.user.username, &loader.user.password), + ) { + (Ok(token), _) => Ok(Some(AuthHeader { + value: format!("Bearer {}", token), + expiration, + })), + (_, (Some(u), Some(p))) => { + let encoded = base64::encode(&format!("{}:{}", u, p)); + Ok(Some(AuthHeader { + value: format!("Basic {}", encoded), + expiration: None, + })) + } + _ => Ok(None), + } +} + // https://github.com/clux/kube-rs/issues/146#issuecomment-590924397 const DEFAULT_TIMEOUT: Duration = Duration::from_secs(295); const IDENTITY_PASSWORD: &str = " ";