From 974f9ebbc0d79394e5939359295fbe6d32ce9396 Mon Sep 17 00:00:00 2001 From: Taylor Thomas Date: Tue, 21 Apr 2020 16:20:02 -0600 Subject: [PATCH] feat(*): Adds automatic token refreshing This is by no means elegant, but seems to be functional. Setting the authorization headers now occurs at request time instead of during initial configuration. It will only try to refresh a token if the configured expiration time has passed. Closes #72 --- kube/Cargo.toml | 2 +- kube/src/client/mod.rs | 11 ++- kube/src/config/mod.rs | 152 ++++++++++++++++++++++++++++------------- 3 files changed, 116 insertions(+), 49 deletions(-) diff --git a/kube/Cargo.toml b/kube/Cargo.toml index e6c8e4fc4..9af8e3934 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -32,7 +32,7 @@ openssl = { version = "0.10.29", optional = true } rustls = { version = "0.17.0", optional = true } bytes = "0.5.4" Inflector = "0.11.4" -tokio = { version = "0.2.17", features = ["time", "signal"] } +tokio = { version = "0.2.17", features = ["time", "signal", "sync"] } [dependencies.reqwest] version = "0.10.4" diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index eed38b748..6af43ae26 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -33,6 +33,7 @@ pub struct Client { cluster_url: reqwest::Url, default_ns: String, inner: reqwest::Client, + config: Config, } impl Client { @@ -72,6 +73,12 @@ impl Client { let uri_str = self.cluster_url.join(pandq.as_str())?; //trace!("Sending request => method = {} uri = {}", parts.method, uri_str); + let mut headers = parts.headers; + // If we have auth headers set, make sure they are updated and attached to the request + if let Some(auth_header) = self.config.get_auth_header().await? { + headers.insert(reqwest::header::AUTHORIZATION, auth_header); + } + let request = match parts.method { http::Method::GET | http::Method::POST @@ -81,7 +88,7 @@ impl Client { other => return Err(Error::InvalidMethod(other.to_string())), }; - let req = request.headers(parts.headers).body(body).build()?; + let req = request.headers(headers).body(body).build()?; let res = self.inner.execute(req).await?; Ok(res) } @@ -299,11 +306,13 @@ impl TryFrom for Client { fn try_from(config: Config) -> Result { let cluster_url = config.cluster_url.clone(); let default_ns = config.default_ns.clone(); + let config_clone = config.clone(); let builder: reqwest::ClientBuilder = config.into(); Ok(Self { cluster_url, default_ns, inner: builder.build()?, + config: config_clone, }) } } diff --git a/kube/src/config/mod.rs b/kube/src/config/mod.rs index 9c487bc31..13499c6a3 100644 --- a/kube/src/config/mod.rs +++ b/kube/src/config/mod.rs @@ -14,10 +14,26 @@ use crate::{Error, Result}; pub use file_loader::KubeConfigOptions; use file_loader::{ConfigLoader, Der}; +use chrono::{DateTime, Utc}; use reqwest::header::{self, HeaderMap}; +use tokio::sync::Mutex; +use std::sync::Arc; use std::time::Duration; +#[derive(Debug, Clone)] +pub(crate) struct AuthHeader { + value: String, + expiration: Option>, +} + +impl AuthHeader { + fn to_header(&self) -> Result { + header::HeaderValue::from_str(&self.value) + .map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e))) + } +} + /// Configuration object detailing things like cluster_url, default namespace, root certificates, and timeouts #[derive(Debug, Clone)] pub struct Config { @@ -41,6 +57,9 @@ pub struct Config { /// This is stored in a raw buffer form so that Config can implement `Clone` /// (since [`reqwest::Identity`] does not currently implement `Clone`) pub(crate) identity: Option<(Vec, String)>, + pub(crate) auth_header: Option>>, + + loader: Option, } impl Config { @@ -58,6 +77,8 @@ impl Config { timeout: DEFAULT_TIMEOUT, accept_invalid_certs: false, identity: None, + auth_header: None, + loader: None, } } @@ -101,22 +122,21 @@ impl Config { let token = incluster_config::load_token() .map_err(|e| Error::Kubeconfig(format!("Unable to load in cluster token: {}", e)))?; - - let mut headers = HeaderMap::new(); - headers.insert( - header::AUTHORIZATION, - header::HeaderValue::from_str(&format!("Bearer {}", token)) - .map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?, - ); + let token = AuthHeader { + value: format!("Bearer {}", token), + expiration: None, + }; Ok(Self { cluster_url, default_ns, root_cert: Some(root_cert), - headers, + headers: HeaderMap::new(), timeout: DEFAULT_TIMEOUT, accept_invalid_certs: false, identity: None, + auth_header: Some(Arc::new(Mutex::new(token))), + loader: None, }) } @@ -134,20 +154,7 @@ impl Config { .clone() .unwrap_or_else(|| String::from("default")); - let token = match &loader.user.token { - Some(token) => Some(token.clone()), - None => { - if let Some(exec) = &loader.user.exec { - let creds = exec::auth_exec(exec)?; - let status = creds.status.ok_or_else(|| { - Error::Kubeconfig("exec-plugin response did not contain a status".into()) - })?; - status.token - } else { - None - } - } - }; + let auth_header = load_auth_header(&loader)?; let mut accept_invalid_certs = false; let mut root_cert = None; @@ -172,41 +179,52 @@ impl Config { } } - let mut headers = HeaderMap::new(); - - match ( - utils::data_or_file(&token, &loader.user.token_file), - (&loader.user.username, &loader.user.password), - ) { - (Ok(token), _) => { - headers.insert( - header::AUTHORIZATION, - header::HeaderValue::from_str(&format!("Bearer {}", token)) - .map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?, - ); - } - (_, (Some(u), Some(p))) => { - let encoded = base64::encode(&format!("{}:{}", u, p)); - headers.insert( - header::AUTHORIZATION, - header::HeaderValue::from_str(&format!("Basic {}", encoded)) - .map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?, - ); - } - _ => {} - } - Ok(Self { cluster_url, default_ns, root_cert, - headers, + headers: HeaderMap::new(), timeout: DEFAULT_TIMEOUT, accept_invalid_certs, identity: identity.map(|i| (i, String::from(IDENTITY_PASSWORD))), + auth_header: auth_header.map(|h| Arc::new(Mutex::new(h))), + loader: Some(loader), }) } + async fn needs_refresh(&self) -> bool { + if let Some(header) = self.auth_header.as_ref() { + header + .lock() + .await + .expiration + // Add some wiggle room onto the current timestamp so we don't get any race + // conditions where the token expires while we are refreshing + .map_or(false, |ex| { + chrono::Utc::now() + chrono::Duration::seconds(60) >= ex + }) + } else { + false + } + } + + pub(crate) async fn get_auth_header(&self) -> Result> { + if self.needs_refresh().await { + if let Some(loader) = self.loader.as_ref() { + if let (Some(current_header), Some(new_header)) = + (self.auth_header.as_ref(), load_auth_header(loader)?) + { + *current_header.lock().await = new_header; + } + } + } + let header = match self.auth_header.as_ref() { + Some(h) => Some(h.lock().await.to_header()?), + None => None, + }; + Ok(header) + } + // The identity functions are used to parse the stored identity buffer // into an `reqwest::Identity` type. We do this because `reqwest::Identity` // is not `Clone`. This allows us to store and clone the buffer and supply @@ -233,6 +251,46 @@ impl Config { } } +fn load_auth_header(loader: &ConfigLoader) -> Result> { + let (raw_token, expiration) = match &loader.user.token { + Some(token) => (Some(token.clone()), None), + None => { + if let Some(exec) = &loader.user.exec { + let creds = exec::auth_exec(exec)?; + let status = creds.status.ok_or_else(|| { + Error::Kubeconfig("exec-plugin response did not contain a status".into()) + })?; + let expiration = match status.expiration_timestamp { + Some(ts) => Some(ts.parse::>().map_err(|e| { + Error::Kubeconfig(format!("Malformed expriation date on token: {}", e)) + })?), + None => None, + }; + (status.token, expiration) + } else { + (None, None) + } + } + }; + match ( + utils::data_or_file(&raw_token, &loader.user.token_file), + (&loader.user.username, &loader.user.password), + ) { + (Ok(token), _) => Ok(Some(AuthHeader { + value: format!("Bearer {}", token), + expiration, + })), + (_, (Some(u), Some(p))) => { + let encoded = base64::encode(&format!("{}:{}", u, p)); + Ok(Some(AuthHeader { + value: format!("Basic {}", encoded), + expiration: None, + })) + } + _ => Ok(None), + } +} + // https://github.com/clux/kube-rs/issues/146#issuecomment-590924397 const DEFAULT_TIMEOUT: Duration = Duration::from_secs(295); const IDENTITY_PASSWORD: &str = " ";