Skip to content

Commit

Permalink
feat(*): Adds automatic token refreshing
Browse files Browse the repository at this point in the history
This is by no means elegant, but seems to be functional. Setting the
authorization headers now occurs at request time instead of during
initial configuration. It will only try to refresh a token if the
configured expiration time has passed.

Closes kube-rs#72
  • Loading branch information
thomastaylor312 committed Apr 27, 2020
1 parent 09dd6b3 commit 974f9eb
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 49 deletions.
2 changes: 1 addition & 1 deletion kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ openssl = { version = "0.10.29", optional = true }
rustls = { version = "0.17.0", optional = true }
bytes = "0.5.4"
Inflector = "0.11.4"
tokio = { version = "0.2.17", features = ["time", "signal"] }
tokio = { version = "0.2.17", features = ["time", "signal", "sync"] }

[dependencies.reqwest]
version = "0.10.4"
Expand Down
11 changes: 10 additions & 1 deletion kube/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct Client {
cluster_url: reqwest::Url,
default_ns: String,
inner: reqwest::Client,
config: Config,
}

impl Client {
Expand Down Expand Up @@ -72,6 +73,12 @@ impl Client {
let uri_str = self.cluster_url.join(pandq.as_str())?;
//trace!("Sending request => method = {} uri = {}", parts.method, uri_str);

let mut headers = parts.headers;
// If we have auth headers set, make sure they are updated and attached to the request
if let Some(auth_header) = self.config.get_auth_header().await? {
headers.insert(reqwest::header::AUTHORIZATION, auth_header);
}

let request = match parts.method {
http::Method::GET
| http::Method::POST
Expand All @@ -81,7 +88,7 @@ impl Client {
other => return Err(Error::InvalidMethod(other.to_string())),
};

let req = request.headers(parts.headers).body(body).build()?;
let req = request.headers(headers).body(body).build()?;
let res = self.inner.execute(req).await?;
Ok(res)
}
Expand Down Expand Up @@ -299,11 +306,13 @@ impl TryFrom<Config> for Client {
fn try_from(config: Config) -> Result<Self> {
let cluster_url = config.cluster_url.clone();
let default_ns = config.default_ns.clone();
let config_clone = config.clone();
let builder: reqwest::ClientBuilder = config.into();
Ok(Self {
cluster_url,
default_ns,
inner: builder.build()?,
config: config_clone,
})
}
}
Expand Down
152 changes: 105 additions & 47 deletions kube/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,26 @@ use crate::{Error, Result};
pub use file_loader::KubeConfigOptions;
use file_loader::{ConfigLoader, Der};

use chrono::{DateTime, Utc};
use reqwest::header::{self, HeaderMap};
use tokio::sync::Mutex;

use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, Clone)]
pub(crate) struct AuthHeader {
value: String,
expiration: Option<DateTime<Utc>>,
}

impl AuthHeader {
fn to_header(&self) -> Result<header::HeaderValue> {
header::HeaderValue::from_str(&self.value)
.map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))
}
}

/// Configuration object detailing things like cluster_url, default namespace, root certificates, and timeouts
#[derive(Debug, Clone)]
pub struct Config {
Expand All @@ -41,6 +57,9 @@ pub struct Config {
/// This is stored in a raw buffer form so that Config can implement `Clone`
/// (since [`reqwest::Identity`] does not currently implement `Clone`)
pub(crate) identity: Option<(Vec<u8>, String)>,
pub(crate) auth_header: Option<Arc<Mutex<AuthHeader>>>,

loader: Option<ConfigLoader>,
}

impl Config {
Expand All @@ -58,6 +77,8 @@ impl Config {
timeout: DEFAULT_TIMEOUT,
accept_invalid_certs: false,
identity: None,
auth_header: None,
loader: None,
}
}

Expand Down Expand Up @@ -101,22 +122,21 @@ impl Config {

let token = incluster_config::load_token()
.map_err(|e| Error::Kubeconfig(format!("Unable to load in cluster token: {}", e)))?;

let mut headers = HeaderMap::new();
headers.insert(
header::AUTHORIZATION,
header::HeaderValue::from_str(&format!("Bearer {}", token))
.map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?,
);
let token = AuthHeader {
value: format!("Bearer {}", token),
expiration: None,
};

Ok(Self {
cluster_url,
default_ns,
root_cert: Some(root_cert),
headers,
headers: HeaderMap::new(),
timeout: DEFAULT_TIMEOUT,
accept_invalid_certs: false,
identity: None,
auth_header: Some(Arc::new(Mutex::new(token))),
loader: None,
})
}

Expand All @@ -134,20 +154,7 @@ impl Config {
.clone()
.unwrap_or_else(|| String::from("default"));

let token = match &loader.user.token {
Some(token) => Some(token.clone()),
None => {
if let Some(exec) = &loader.user.exec {
let creds = exec::auth_exec(exec)?;
let status = creds.status.ok_or_else(|| {
Error::Kubeconfig("exec-plugin response did not contain a status".into())
})?;
status.token
} else {
None
}
}
};
let auth_header = load_auth_header(&loader)?;

let mut accept_invalid_certs = false;
let mut root_cert = None;
Expand All @@ -172,41 +179,52 @@ impl Config {
}
}

let mut headers = HeaderMap::new();

match (
utils::data_or_file(&token, &loader.user.token_file),
(&loader.user.username, &loader.user.password),
) {
(Ok(token), _) => {
headers.insert(
header::AUTHORIZATION,
header::HeaderValue::from_str(&format!("Bearer {}", token))
.map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?,
);
}
(_, (Some(u), Some(p))) => {
let encoded = base64::encode(&format!("{}:{}", u, p));
headers.insert(
header::AUTHORIZATION,
header::HeaderValue::from_str(&format!("Basic {}", encoded))
.map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?,
);
}
_ => {}
}

Ok(Self {
cluster_url,
default_ns,
root_cert,
headers,
headers: HeaderMap::new(),
timeout: DEFAULT_TIMEOUT,
accept_invalid_certs,
identity: identity.map(|i| (i, String::from(IDENTITY_PASSWORD))),
auth_header: auth_header.map(|h| Arc::new(Mutex::new(h))),
loader: Some(loader),
})
}

async fn needs_refresh(&self) -> bool {
if let Some(header) = self.auth_header.as_ref() {
header
.lock()
.await
.expiration
// Add some wiggle room onto the current timestamp so we don't get any race
// conditions where the token expires while we are refreshing
.map_or(false, |ex| {
chrono::Utc::now() + chrono::Duration::seconds(60) >= ex
})
} else {
false
}
}

pub(crate) async fn get_auth_header(&self) -> Result<Option<header::HeaderValue>> {
if self.needs_refresh().await {
if let Some(loader) = self.loader.as_ref() {
if let (Some(current_header), Some(new_header)) =
(self.auth_header.as_ref(), load_auth_header(loader)?)
{
*current_header.lock().await = new_header;
}
}
}
let header = match self.auth_header.as_ref() {
Some(h) => Some(h.lock().await.to_header()?),
None => None,
};
Ok(header)
}

// The identity functions are used to parse the stored identity buffer
// into an `reqwest::Identity` type. We do this because `reqwest::Identity`
// is not `Clone`. This allows us to store and clone the buffer and supply
Expand All @@ -233,6 +251,46 @@ impl Config {
}
}

fn load_auth_header(loader: &ConfigLoader) -> Result<Option<AuthHeader>> {
let (raw_token, expiration) = match &loader.user.token {
Some(token) => (Some(token.clone()), None),
None => {
if let Some(exec) = &loader.user.exec {
let creds = exec::auth_exec(exec)?;
let status = creds.status.ok_or_else(|| {
Error::Kubeconfig("exec-plugin response did not contain a status".into())
})?;
let expiration = match status.expiration_timestamp {
Some(ts) => Some(ts.parse::<DateTime<Utc>>().map_err(|e| {
Error::Kubeconfig(format!("Malformed expriation date on token: {}", e))
})?),
None => None,
};
(status.token, expiration)
} else {
(None, None)
}
}
};
match (
utils::data_or_file(&raw_token, &loader.user.token_file),
(&loader.user.username, &loader.user.password),
) {
(Ok(token), _) => Ok(Some(AuthHeader {
value: format!("Bearer {}", token),
expiration,
})),
(_, (Some(u), Some(p))) => {
let encoded = base64::encode(&format!("{}:{}", u, p));
Ok(Some(AuthHeader {
value: format!("Basic {}", encoded),
expiration: None,
}))
}
_ => Ok(None),
}
}

// https://github.com/clux/kube-rs/issues/146#issuecomment-590924397
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(295);
const IDENTITY_PASSWORD: &str = " ";
Expand Down

0 comments on commit 974f9eb

Please sign in to comment.