Skip to content

Commit

Permalink
feat(*): Adds automatic token refreshing
Browse files Browse the repository at this point in the history
This is by no means elegant, but seems to be functional. Setting the
authorization headers now occurs at request time instead of during
initial configuration. It will only try to refresh a token if the
configured expiration time has passed.

Closes kube-rs#72
  • Loading branch information
thomastaylor312 committed Apr 23, 2020
1 parent 9cc0571 commit c6c19c5
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 49 deletions.
2 changes: 1 addition & 1 deletion kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ openssl = { version = "0.10.29", optional = true }
rustls = { version = "0.17.0", optional = true }
bytes = "0.5.4"
Inflector = "0.11.4"
tokio = { version = "0.2.17", features = ["time", "signal"] }
tokio = { version = "0.2.17", features = ["time", "signal", "sync"] }

[dependencies.reqwest]
version = "0.10.4"
Expand Down
11 changes: 10 additions & 1 deletion kube/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct Client {
cluster_url: reqwest::Url,
default_ns: String,
inner: reqwest::Client,
config: Config,
}

impl Client {
Expand Down Expand Up @@ -72,6 +73,12 @@ impl Client {
let uri_str = self.cluster_url.join(pandq.as_str())?;
//trace!("Sending request => method = {} uri = {}", parts.method, uri_str);

let mut headers = parts.headers;
// If we have auth headers set, make sure they are updated and attached to the request
if let Some(auth_header) = self.config.get_auth_header().await? {
headers.insert(reqwest::header::AUTHORIZATION, auth_header);
}

let request = match parts.method {
http::Method::GET
| http::Method::POST
Expand All @@ -81,7 +88,7 @@ impl Client {
other => return Err(Error::InvalidMethod(other.to_string())),
};

let req = request.headers(parts.headers).body(body).build()?;
let req = request.headers(headers).body(body).build()?;
let res = self.inner.execute(req).await?;
Ok(res)
}
Expand Down Expand Up @@ -299,11 +306,13 @@ impl TryFrom<Config> for Client {
fn try_from(config: Config) -> Result<Self> {
let cluster_url = config.cluster_url.clone();
let default_ns = config.default_ns.clone();
let config_clone = config.clone();
let builder: reqwest::ClientBuilder = config.into();
Ok(Self {
cluster_url,
default_ns,
inner: builder.build()?,
config: config_clone,
})
}
}
Expand Down
150 changes: 103 additions & 47 deletions kube/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,26 @@ use crate::{Error, Result};
pub use file_loader::KubeConfigOptions;
use file_loader::{ConfigLoader, Der};

use chrono::{DateTime, Utc};
use reqwest::header::{self, HeaderMap};
use tokio::sync::Mutex;

use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, Clone)]
pub(crate) struct AuthHeader {
value: String,
expiration: Option<DateTime<Utc>>,
}

impl AuthHeader {
fn to_header(&self) -> Result<header::HeaderValue> {
header::HeaderValue::from_str(&self.value)
.map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))
}
}

/// Configuration object detailing things like cluster_url, default namespace, root certificates, and timeouts
#[derive(Debug, Clone)]
pub struct Config {
Expand All @@ -41,6 +57,9 @@ pub struct Config {
/// This is stored in a raw buffer form so that Config can implement `Clone`
/// (since [`reqwest::Identity`] does not currently implement `Clone`)
pub(crate) identity: Option<(Vec<u8>, String)>,
pub(crate) auth_header: Option<Arc<Mutex<AuthHeader>>>,

This comment has been minimized.

Copy link
@rylev

rylev Apr 24, 2020

Just thinking out loud here:
Would it be better to model auth like this:

enum Authentication {
    None,
    Basic(String),
    Token(String),
    RefreshableToken(Arc<Mutex<(String, DateTime<Utc>)>>, Loader)
}

And then remove loader as an explicit field.

I would store just the tokens in the enum and then have a as_header(&self) -> Option<String> method which builds the right header values for each.

It would probably make things more verbose, but it does enumerate the possibilities and you can't possibly have a Config with None for loader when it needs one to refresh the token.

This comment has been minimized.

Copy link
@thomastaylor312

thomastaylor312 Apr 24, 2020

Author Owner

That is a pretty smart idea. Let me give it a whirl


loader: Option<ConfigLoader>,
}

impl Config {
Expand All @@ -58,6 +77,8 @@ impl Config {
timeout: DEFAULT_TIMEOUT,
accept_invalid_certs: false,
identity: None,
auth_header: None,
loader: None,
}
}

Expand Down Expand Up @@ -101,22 +122,21 @@ impl Config {

let token = incluster_config::load_token()
.map_err(|e| Error::Kubeconfig(format!("Unable to load in cluster token: {}", e)))?;

let mut headers = HeaderMap::new();
headers.insert(
header::AUTHORIZATION,
header::HeaderValue::from_str(&format!("Bearer {}", token))
.map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?,
);
let token = AuthHeader {
value: format!("Bearer {}", token),
expiration: None,
};

Ok(Self {
cluster_url,
default_ns,
root_cert: Some(root_cert),
headers,
headers: HeaderMap::new(),

This comment has been minimized.

Copy link
@rylev

rylev Apr 24, 2020

Do we need to keep headers around now? Seems like we can just drop this field

This comment has been minimized.

Copy link
@thomastaylor312

thomastaylor312 Apr 24, 2020

Author Owner

I was going to open the PR and drop a comment on asking. I don't think we do, but don't want to break it if there are future plans

timeout: DEFAULT_TIMEOUT,
accept_invalid_certs: false,
identity: None,
auth_header: Some(Arc::new(Mutex::new(token))),
loader: None,
})
}

Expand All @@ -134,20 +154,7 @@ impl Config {
.clone()
.unwrap_or_else(|| String::from("default"));

let token = match &loader.user.token {
Some(token) => Some(token.clone()),
None => {
if let Some(exec) = &loader.user.exec {
let creds = exec::auth_exec(exec)?;
let status = creds.status.ok_or_else(|| {
Error::Kubeconfig("exec-plugin response did not contain a status".into())
})?;
status.token
} else {
None
}
}
};
let auth_header = load_auth_header(&loader)?;

let mut accept_invalid_certs = false;
let mut root_cert = None;
Expand All @@ -172,41 +179,49 @@ impl Config {
}
}

let mut headers = HeaderMap::new();

match (
utils::data_or_file(&token, &loader.user.token_file),
(&loader.user.username, &loader.user.password),
) {
(Ok(token), _) => {
headers.insert(
header::AUTHORIZATION,
header::HeaderValue::from_str(&format!("Bearer {}", token))
.map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?,
);
}
(_, (Some(u), Some(p))) => {
let encoded = base64::encode(&format!("{}:{}", u, p));
headers.insert(
header::AUTHORIZATION,
header::HeaderValue::from_str(&format!("Basic {}", encoded))
.map_err(|e| Error::Kubeconfig(format!("Invalid bearer token: {}", e)))?,
);
}
_ => {}
}

Ok(Self {
cluster_url,
default_ns,
root_cert,
headers,
headers: HeaderMap::new(),
timeout: DEFAULT_TIMEOUT,
accept_invalid_certs,
identity: identity.map(|i| (i, String::from(IDENTITY_PASSWORD))),
auth_header: auth_header.map(|h| Arc::new(Mutex::new(h))),
loader: Some(loader),
})
}

async fn needs_refresh(&self) -> bool {
if let Some(header) = self.auth_header.as_ref() {
header
.lock()
.await
.expiration
.map_or(false, |ex| chrono::Utc::now() >= ex)

This comment has been minimized.

Copy link
@rylev

rylev Apr 24, 2020

Is this the best policy? Perhaps we want to do a refresh if the expiration is reasonably close? I know some refresh policies that refresh if the token is half way to expiration. With this policy it's pretty easy to hit a race condition where the token is about to expire but isn't expired, we don't refresh, but by the time the call is made, the token as expired.

This comment has been minimized.

Copy link
@thomastaylor312

thomastaylor312 Apr 24, 2020

Author Owner

Yep, you are right. I'll add some wiggle room

} else {
false
}
}

pub(crate) async fn get_auth_header(&self) -> Result<Option<header::HeaderValue>> {
if self.needs_refresh().await {
if let Some(loader) = self.loader.as_ref() {
let new_header = load_auth_header(loader)?;
if let Some(current_header) = self.auth_header.as_ref() {

This comment has been minimized.

Copy link
@rylev

rylev Apr 24, 2020

if let (Some(current_header), Some(new_header)) = (self.auth_header.as_ref(), load_auth_header(loader)?) { 
if let Some(header) = new_header {
*current_header.lock().await = header;
}
}
}
}
let header = match self.auth_header.as_ref() {
Some(h) => Some(h.lock().await.to_header()?),
None => None,
};
Ok(header)
}

// The identity functions are used to parse the stored identity buffer
// into an `reqwest::Identity` type. We do this because `reqwest::Identity`
// is not `Clone`. This allows us to store and clone the buffer and supply
Expand All @@ -233,6 +248,47 @@ impl Config {
}
}

fn load_auth_header(loader: &ConfigLoader) -> Result<Option<AuthHeader>> {
let (raw_token, expiration) = match &loader.user.token {
Some(token) => (Some(token.clone()), None),
None => {
if let Some(exec) = &loader.user.exec {
let creds = exec::auth_exec(exec)?;
let status = creds.status.ok_or_else(|| {
Error::Kubeconfig("exec-plugin response did not contain a status".into())
})?;
let expiration = match status.expiration_timestamp {
Some(ts) => Some(
ts.parse::<DateTime<Utc>>()
.map_err(|e| Error::Kubeconfig(format!("Malformed url: {}", e)))?,

This comment has been minimized.

Copy link
@rylev

rylev Apr 24, 2020

"Malformed expiration date: {}" maybe?

This comment has been minimized.

Copy link
@thomastaylor312

thomastaylor312 Apr 24, 2020

Author Owner

Copy/paste error

),
None => None,
};
(status.token, expiration)
} else {
(None, None)
}
}
};
match (
utils::data_or_file(&raw_token, &loader.user.token_file),
(&loader.user.username, &loader.user.password),
) {
(Ok(token), _) => Ok(Some(AuthHeader {
value: format!("Bearer {}", token),
expiration,
})),
(_, (Some(u), Some(p))) => {
let encoded = base64::encode(&format!("{}:{}", u, p));
Ok(Some(AuthHeader {
value: format!("Basic {}", encoded),
expiration: None,
}))
}
_ => Ok(None),
}
}

// https://github.com/clux/kube-rs/issues/146#issuecomment-590924397
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(295);
const IDENTITY_PASSWORD: &str = " ";
Expand Down

0 comments on commit c6c19c5

Please sign in to comment.