Skip to content

Commit

Permalink
chore: Merge branch 'main' into feat/derive-typed-scale-arg
Browse files Browse the repository at this point in the history
  • Loading branch information
Techassi committed Feb 17, 2025
2 parents 696d389 + 07b7891 commit 57e9c4e
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/clippy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ jobs:
- uses: dtolnay/rust-toolchain@nightly
with:
components: clippy
- uses: clechasseur/rs-clippy-check@v3
- uses: clechasseur/rs-clippy-check@v4
with:
args: --workspace
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ assert-json-diff = "2.0.2"
async-broadcast = "0.7.0"
async-stream = "0.3.5"
async-trait = "0.1.64"
backoff = "0.4.0"
backon = "1.3"
base64 = "0.22.1"
bytes = "1.1.0"
chrono = { version = "0.4.34", default-features = false }
Expand Down Expand Up @@ -68,9 +68,8 @@ pem = "3.0.1"
pin-project = "1.0.4"
proc-macro2 = "1.0.29"
quote = "1.0.10"
rand = "0.8.3"
rand = "0.9.0"
rustls = { version = "0.23.16", default-features = false }
rustls-pemfile = "2.0.0"
schemars = "0.8.6"
secrecy = "0.10.2"
serde = "1.0.130"
Expand Down
14 changes: 14 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,17 @@ name = "thiserror-impl"
name = "security-framework"
[[bans.skip]]
name = "core-foundation"

# currently tungstenite hasn't upgraded rand to 0.9 yet, all these are related
[[bans.skip]]
name = "rand"
[[bans.skip]]
name = "rand_core"
[[bans.skip]]
name = "rand_chacha"
[[bans.skip]]
name = "getrandom"
[[bans.skip]]
name = "wasi"
[[bans.skip]]
name = "zerocopy"
4 changes: 2 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ latest = ["k8s-openapi/latest"]
[dev-dependencies]
tokio-util.workspace = true
assert-json-diff.workspace = true
garde = { version = "0.21.0", default-features = false, features = ["derive"] }
garde = { version = "0.22.0", default-features = false, features = ["derive"] }
anyhow.workspace = true
futures = { workspace = true, features = ["async-await"] }
jsonpath-rust.workspace = true
Expand Down Expand Up @@ -52,7 +52,7 @@ tower-http = { workspace = true, features = ["trace", "decompression-gzip"] }
hyper = { workspace = true, features = ["client", "http1"] }
hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] }
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] }
edit = "0.1.3"
tokio-stream = { version = "0.1.9", features = ["net"] }
Expand Down
6 changes: 2 additions & 4 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ categories = ["web-programming::http-client", "network-programming", "api-bindin

[features]
default = ["client"]
rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls", "hyper-http-proxy?/rustls-tls-native-roots"]
rustls-tls = ["rustls", "hyper-rustls", "hyper-http-proxy?/rustls-tls-native-roots"]
webpki-roots = ["hyper-rustls/webpki-roots"]
aws-lc-rs = ["rustls?/aws-lc-rs"]
openssl-tls = ["openssl", "hyper-openssl"]
ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws", "tokio/macros"]
ws = ["client", "tokio-tungstenite", "kube-core/ws", "tokio/macros"]
kubelet-debug = ["ws", "kube-core/kubelet-debug"]
oauth = ["client", "tame-oauth"]
oidc = ["client", "form_urlencoded"]
Expand Down Expand Up @@ -57,7 +57,6 @@ futures = { workspace = true, optional = true, features = ["std"] }
pem = { workspace = true, optional = true }
openssl = { workspace = true, optional = true }
rustls = { workspace = true, optional = true }
rustls-pemfile = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
tokio = { workspace = true, features = ["time", "signal", "sync"], optional = true }
kube-core = { path = "../kube-core", version = "=0.98.0" }
Expand All @@ -73,7 +72,6 @@ tower = { workspace = true, features = ["buffer", "filter", "util"], optional =
tower-http = { workspace = true, features = ["auth", "map-response-body", "trace"], optional = true }
hyper-timeout = { workspace = true, optional = true }
tame-oauth = { workspace = true, features = ["gcp"], optional = true }
rand = { workspace = true, optional = true }
secrecy = { workspace = true }
tracing = { workspace = true, features = ["log"], optional = true }
hyper-openssl = { workspace = true, features = ["client-legacy"], optional = true }
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl Client {
http::header::SEC_WEBSOCKET_VERSION,
HeaderValue::from_static("13"),
);
let key = upgrade::sec_websocket_key();
let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
parts.headers.insert(
http::header::SEC_WEBSOCKET_KEY,
key.parse().expect("valid header value"),
Expand Down
19 changes: 8 additions & 11 deletions kube-client/src/client/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod rustls_tls {
pub enum Error {
/// Identity PEM is invalid
#[error("identity PEM is invalid: {0}")]
InvalidIdentityPem(#[source] std::io::Error),
InvalidIdentityPem(#[source] rustls::pki_types::pem::Error),

/// Identity PEM is missing a private key: the key must be PKCS8 or RSA/PKCS1
#[error("identity PEM is missing a private key: the key must be PKCS8 or RSA/PKCS1")]
Expand Down Expand Up @@ -96,22 +96,19 @@ pub mod rustls_tls {
}

fn client_auth(data: &[u8]) -> Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>), Error> {
use rustls_pemfile::Item;
use rustls::pki_types::pem::{self, SectionKind};

let mut cert_chain = Vec::new();
let mut pkcs8_key = None;
let mut pkcs1_key = None;
let mut sec1_key = None;
let mut reader = std::io::Cursor::new(data);
for item in rustls_pemfile::read_all(&mut reader)
.collect::<Result<Vec<_>, _>>()
.map_err(Error::InvalidIdentityPem)?
{
match item {
Item::X509Certificate(cert) => cert_chain.push(cert),
Item::Pkcs8Key(key) => pkcs8_key = Some(PrivateKeyDer::Pkcs8(key)),
Item::Pkcs1Key(key) => pkcs1_key = Some(PrivateKeyDer::from(key)),
Item::Sec1Key(key) => sec1_key = Some(PrivateKeyDer::from(key)),
while let Some((kind, der)) = pem::from_buf(&mut reader).map_err(Error::InvalidIdentityPem)? {
match kind {
SectionKind::Certificate => cert_chain.push(der.into()),
SectionKind::PrivateKey => pkcs8_key = Some(PrivateKeyDer::Pkcs8(der.into())),
SectionKind::RsaPrivateKey => pkcs1_key = Some(PrivateKeyDer::Pkcs1(der.into())),
SectionKind::EcPrivateKey => sec1_key = Some(PrivateKeyDer::Sec1(der.into())),
_ => return Err(Error::UnknownPrivateKeyFormat),
}
}
Expand Down
8 changes: 0 additions & 8 deletions kube-client/src/client/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,3 @@ pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeCon

Ok(())
}

/// Generate a random key for the `Sec-WebSocket-Key` header.
/// This must be nonce consisting of a randomly selected 16-byte value in base64.
pub fn sec_websocket_key() -> String {
use base64::Engine;
let r: [u8; 16] = rand::random();
base64::engine::general_purpose::STANDARD.encode(r)
}
2 changes: 1 addition & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ json-patch.workspace = true
jsonptr.workspace = true
serde_json.workspace = true
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
async-trait.workspace = true
hashbrown.workspace = true
k8s-openapi.workspace = true
Expand Down
7 changes: 4 additions & 3 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use crate::{
ObjectRef,
},
scheduler::{debounced_scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
utils::{
trystream_try_via, Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt,
},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
};
use backoff::backoff::Backoff;
use educe::Educe;
use futures::{
channel,
Expand Down Expand Up @@ -915,7 +916,7 @@ where
/// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
/// but can be overridden by calling this method.
#[must_use]
pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self {
self.trigger_backoff = Box::new(backoff);
self
}
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ where
Poll::Pending | Poll::Ready(None) => break Poll::Pending,
// The above future never returns Poll::Ready(Some(_)).
_ => unreachable!(),
};
};
}
}

// Try to take a new message that isn't already being processed
// leave the already-processing ones in the queue, so that we can take them once
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl Recorder {
.await?;
} else {
events.create(&PostParams::default(), &event).await?;
};
}

{
let mut cache = self.cache.write().await;
Expand Down
6 changes: 3 additions & 3 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mod tests {
use futures::{stream, StreamExt, TryStreamExt};
use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
use rand::{
distributions::{Bernoulli, Uniform},
distr::{Bernoulli, Uniform},
Rng,
};
use std::collections::{BTreeMap, HashMap};
Expand Down Expand Up @@ -256,8 +256,8 @@ mod tests {

#[tokio::test]
async fn reflector_store_should_not_contain_duplicates() {
let mut rng = rand::thread_rng();
let item_dist = Uniform::new(0_u8, 100);
let mut rng = rand::rng();
let item_dist = Uniform::new(0_u8, 100).unwrap();
let deleted_dist = Bernoulli::new(0.40).unwrap();
let store_w = store::Writer::default();
let store = store_w.as_reader();
Expand Down
62 changes: 29 additions & 33 deletions kube-runtime/src/utils/backoff_reset_timer.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
use std::time::{Duration, Instant};

use backoff::{backoff::Backoff, Clock, SystemClock};
pub trait Backoff: Iterator<Item = Duration> + Send + Sync + Unpin {
/// Resets the internal state to the initial value.
fn reset(&mut self);
}

impl<B: Backoff + ?Sized> Backoff for Box<B> {
fn reset(&mut self) {
let this: &mut B = self;
this.reset()
}
}

/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed.
pub struct ResetTimerBackoff<B, C = SystemClock> {
pub struct ResetTimerBackoff<B: Backoff> {
backoff: B,
clock: C,
last_backoff: Option<Instant>,
reset_duration: Duration,
}

impl<B: Backoff> ResetTimerBackoff<B> {
pub fn new(backoff: B, reset_duration: Duration) -> Self {
Self::new_with_custom_clock(backoff, reset_duration, SystemClock {})
}
}

impl<B: Backoff, C: Clock> ResetTimerBackoff<B, C> {
fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self {
Self {
backoff,
clock,
last_backoff: None,
reset_duration,
}
}
}

impl<B: Backoff, C: Clock> Backoff for ResetTimerBackoff<B, C> {
fn next_backoff(&mut self) -> Option<Duration> {
impl<B: Backoff> Iterator for ResetTimerBackoff<B> {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
if let Some(last_backoff) = self.last_backoff {
if self.clock.now() > last_backoff + self.reset_duration {
if tokio::time::Instant::now().into_std() > last_backoff + self.reset_duration {
tracing::debug!(
?last_backoff,
reset_duration = ?self.reset_duration,
Expand All @@ -39,48 +43,40 @@ impl<B: Backoff, C: Clock> Backoff for ResetTimerBackoff<B, C> {
self.backoff.reset();
}
}
self.last_backoff = Some(self.clock.now());
self.backoff.next_backoff()
self.last_backoff = Some(tokio::time::Instant::now().into_std());
self.backoff.next()
}
}

impl<B: Backoff> Backoff for ResetTimerBackoff<B> {
fn reset(&mut self) {
// Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires.
self.backoff.reset();
}
}

#[cfg(test)]
mod tests {
use backoff::{backoff::Backoff, Clock};
use tokio::time::advance;

use super::ResetTimerBackoff;
use crate::utils::stream_backoff::tests::LinearBackoff;
use std::time::{Duration, Instant};
use std::time::Duration;

#[tokio::test]
async fn should_reset_when_timer_expires() {
tokio::time::pause();
let mut backoff = ResetTimerBackoff::new_with_custom_clock(
let mut backoff = ResetTimerBackoff::new(
LinearBackoff::new(Duration::from_secs(2)),
Duration::from_secs(60),
TokioClock,
);
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4)));
assert_eq!(backoff.next(), Some(Duration::from_secs(4)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6)));
assert_eq!(backoff.next(), Some(Duration::from_secs(6)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
}

struct TokioClock;

impl Clock for TokioClock {
fn now(&self) -> Instant {
tokio::time::Instant::now().into_std()
}
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
}
}
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod reflect;
mod stream_backoff;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
pub use event_decode::EventDecode;
pub use event_modify::EventModify;
pub use predicate::{predicates, Predicate, PredicateFilter};
Expand Down
Loading

0 comments on commit 57e9c4e

Please sign in to comment.