diff --git a/.github/workflows/clippy.yml b/.github/workflows/clippy.yml index 493b22fc4..a11059080 100644 --- a/.github/workflows/clippy.yml +++ b/.github/workflows/clippy.yml @@ -14,6 +14,6 @@ jobs: - uses: dtolnay/rust-toolchain@nightly with: components: clippy - - uses: clechasseur/rs-clippy-check@v3 + - uses: clechasseur/rs-clippy-check@v4 with: args: --workspace diff --git a/Cargo.toml b/Cargo.toml index 906fece31..62a488ca7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ assert-json-diff = "2.0.2" async-broadcast = "0.7.0" async-stream = "0.3.5" async-trait = "0.1.64" -backoff = "0.4.0" +backon = "1.3" base64 = "0.22.1" bytes = "1.1.0" chrono = { version = "0.4.34", default-features = false } @@ -68,9 +68,8 @@ pem = "3.0.1" pin-project = "1.0.4" proc-macro2 = "1.0.29" quote = "1.0.10" -rand = "0.8.3" +rand = "0.9.0" rustls = { version = "0.23.16", default-features = false } -rustls-pemfile = "2.0.0" schemars = "0.8.6" secrecy = "0.10.2" serde = "1.0.130" diff --git a/deny.toml b/deny.toml index 2f1704961..f2d2dfe0c 100644 --- a/deny.toml +++ b/deny.toml @@ -97,3 +97,17 @@ name = "thiserror-impl" name = "security-framework" [[bans.skip]] name = "core-foundation" + +# currently tungstenite hasn't upgraded rand to 0.9 yet, all these are related +[[bans.skip]] +name = "rand" +[[bans.skip]] +name = "rand_core" +[[bans.skip]] +name = "rand_chacha" +[[bans.skip]] +name = "getrandom" +[[bans.skip]] +name = "wasi" +[[bans.skip]] +name = "zerocopy" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index f260bc73b..3abb06521 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -24,7 +24,7 @@ latest = ["k8s-openapi/latest"] [dev-dependencies] tokio-util.workspace = true assert-json-diff.workspace = true -garde = { version = "0.21.0", default-features = false, features = ["derive"] } +garde = { version = "0.22.0", default-features = false, features = ["derive"] } anyhow.workspace = true futures = { workspace = true, features = ["async-await"] } jsonpath-rust.workspace = true @@ -52,7 +52,7 @@ tower-http = { workspace = true, features = ["trace", "decompression-gzip"] } hyper = { workspace = true, features = ["client", "http1"] } hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] } thiserror.workspace = true -backoff.workspace = true +backon.workspace = true clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] } edit = "0.1.3" tokio-stream = { version = "0.1.9", features = ["net"] } diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index 30de09717..d28bf670c 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -13,11 +13,11 @@ categories = ["web-programming::http-client", "network-programming", "api-bindin [features] default = ["client"] -rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls", "hyper-http-proxy?/rustls-tls-native-roots"] +rustls-tls = ["rustls", "hyper-rustls", "hyper-http-proxy?/rustls-tls-native-roots"] webpki-roots = ["hyper-rustls/webpki-roots"] aws-lc-rs = ["rustls?/aws-lc-rs"] openssl-tls = ["openssl", "hyper-openssl"] -ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws", "tokio/macros"] +ws = ["client", "tokio-tungstenite", "kube-core/ws", "tokio/macros"] kubelet-debug = ["ws", "kube-core/kubelet-debug"] oauth = ["client", "tame-oauth"] oidc = ["client", "form_urlencoded"] @@ -57,7 +57,6 @@ futures = { workspace = true, optional = true, features = ["std"] } pem = { workspace = true, optional = true } openssl = { workspace = true, optional = true } rustls = { workspace = true, optional = true } -rustls-pemfile = { workspace = true, optional = true } bytes = { workspace = true, optional = true } tokio = { workspace = true, features = ["time", "signal", "sync"], optional = true } kube-core = { path = "../kube-core", version = "=0.98.0" } @@ -73,7 +72,6 @@ tower = { workspace = true, features = ["buffer", "filter", "util"], optional = tower-http = { workspace = true, features = ["auth", "map-response-body", "trace"], optional = true } hyper-timeout = { workspace = true, optional = true } tame-oauth = { workspace = true, features = ["gcp"], optional = true } -rand = { workspace = true, optional = true } secrecy = { workspace = true } tracing = { workspace = true, features = ["log"], optional = true } hyper-openssl = { workspace = true, features = ["client-legacy"], optional = true } diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index cd6c9ac9e..9f6dd0a4c 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -206,7 +206,7 @@ impl Client { http::header::SEC_WEBSOCKET_VERSION, HeaderValue::from_static("13"), ); - let key = upgrade::sec_websocket_key(); + let key = tokio_tungstenite::tungstenite::handshake::client::generate_key(); parts.headers.insert( http::header::SEC_WEBSOCKET_KEY, key.parse().expect("valid header value"), diff --git a/kube-client/src/client/tls.rs b/kube-client/src/client/tls.rs index 25bdb737e..c264f5a95 100644 --- a/kube-client/src/client/tls.rs +++ b/kube-client/src/client/tls.rs @@ -14,7 +14,7 @@ pub mod rustls_tls { pub enum Error { /// Identity PEM is invalid #[error("identity PEM is invalid: {0}")] - InvalidIdentityPem(#[source] std::io::Error), + InvalidIdentityPem(#[source] rustls::pki_types::pem::Error), /// Identity PEM is missing a private key: the key must be PKCS8 or RSA/PKCS1 #[error("identity PEM is missing a private key: the key must be PKCS8 or RSA/PKCS1")] @@ -96,22 +96,19 @@ pub mod rustls_tls { } fn client_auth(data: &[u8]) -> Result<(Vec>, PrivateKeyDer<'static>), Error> { - use rustls_pemfile::Item; + use rustls::pki_types::pem::{self, SectionKind}; let mut cert_chain = Vec::new(); let mut pkcs8_key = None; let mut pkcs1_key = None; let mut sec1_key = None; let mut reader = std::io::Cursor::new(data); - for item in rustls_pemfile::read_all(&mut reader) - .collect::, _>>() - .map_err(Error::InvalidIdentityPem)? - { - match item { - Item::X509Certificate(cert) => cert_chain.push(cert), - Item::Pkcs8Key(key) => pkcs8_key = Some(PrivateKeyDer::Pkcs8(key)), - Item::Pkcs1Key(key) => pkcs1_key = Some(PrivateKeyDer::from(key)), - Item::Sec1Key(key) => sec1_key = Some(PrivateKeyDer::from(key)), + while let Some((kind, der)) = pem::from_buf(&mut reader).map_err(Error::InvalidIdentityPem)? { + match kind { + SectionKind::Certificate => cert_chain.push(der.into()), + SectionKind::PrivateKey => pkcs8_key = Some(PrivateKeyDer::Pkcs8(der.into())), + SectionKind::RsaPrivateKey => pkcs1_key = Some(PrivateKeyDer::Pkcs1(der.into())), + SectionKind::EcPrivateKey => sec1_key = Some(PrivateKeyDer::Sec1(der.into())), _ => return Err(Error::UnknownPrivateKeyFormat), } } diff --git a/kube-client/src/client/upgrade.rs b/kube-client/src/client/upgrade.rs index 3bfb4f49b..0e8d7d528 100644 --- a/kube-client/src/client/upgrade.rs +++ b/kube-client/src/client/upgrade.rs @@ -86,11 +86,3 @@ pub fn verify_response(res: &Response, key: &str) -> Result<(), UpgradeCon Ok(()) } - -/// Generate a random key for the `Sec-WebSocket-Key` header. -/// This must be nonce consisting of a randomly selected 16-byte value in base64. -pub fn sec_websocket_key() -> String { - use base64::Engine; - let r: [u8; 16] = rand::random(); - base64::engine::general_purpose::STANDARD.encode(r) -} diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 959601641..09975cf5f 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -43,7 +43,7 @@ json-patch.workspace = true jsonptr.workspace = true serde_json.workspace = true thiserror.workspace = true -backoff.workspace = true +backon.workspace = true async-trait.workspace = true hashbrown.workspace = true k8s-openapi.workspace = true diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index e701b1d1b..8a8f7fc3a 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -8,10 +8,11 @@ use crate::{ ObjectRef, }, scheduler::{debounced_scheduler, ScheduleRequest}, - utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, + utils::{ + trystream_try_via, Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt, + }, watcher::{self, metadata_watcher, watcher, DefaultBackoff}, }; -use backoff::backoff::Backoff; use educe::Educe; use futures::{ channel, @@ -915,7 +916,7 @@ where /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions, /// but can be overridden by calling this method. #[must_use] - pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self { + pub fn trigger_backoff(mut self, backoff: impl Backoff + 'static) -> Self { self.trigger_backoff = Box::new(backoff); self } diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index ddc87fbc6..697460763 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -121,8 +121,8 @@ where Poll::Pending | Poll::Ready(None) => break Poll::Pending, // The above future never returns Poll::Ready(Some(_)). _ => unreachable!(), - }; - }; + } + } // Try to take a new message that isn't already being processed // leave the already-processing ones in the queue, so that we can take them once diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index d62873315..19c683dbf 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -345,7 +345,7 @@ impl Recorder { .await?; } else { events.create(&PostParams::default(), &event).await?; - }; + } { let mut cache = self.cache.write().await; diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 068f83a9a..88f4f2910 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -141,7 +141,7 @@ mod tests { use futures::{stream, StreamExt, TryStreamExt}; use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta}; use rand::{ - distributions::{Bernoulli, Uniform}, + distr::{Bernoulli, Uniform}, Rng, }; use std::collections::{BTreeMap, HashMap}; @@ -256,8 +256,8 @@ mod tests { #[tokio::test] async fn reflector_store_should_not_contain_duplicates() { - let mut rng = rand::thread_rng(); - let item_dist = Uniform::new(0_u8, 100); + let mut rng = rand::rng(); + let item_dist = Uniform::new(0_u8, 100).unwrap(); let deleted_dist = Bernoulli::new(0.40).unwrap(); let store_w = store::Writer::default(); let store = store_w.as_reader(); diff --git a/kube-runtime/src/utils/backoff_reset_timer.rs b/kube-runtime/src/utils/backoff_reset_timer.rs index 1c09a5344..e18817c24 100644 --- a/kube-runtime/src/utils/backoff_reset_timer.rs +++ b/kube-runtime/src/utils/backoff_reset_timer.rs @@ -1,36 +1,40 @@ use std::time::{Duration, Instant}; -use backoff::{backoff::Backoff, Clock, SystemClock}; +pub trait Backoff: Iterator + Send + Sync + Unpin { + /// Resets the internal state to the initial value. + fn reset(&mut self); +} + +impl Backoff for Box { + fn reset(&mut self) { + let this: &mut B = self; + this.reset() + } +} /// A [`Backoff`] wrapper that resets after a fixed duration has elapsed. -pub struct ResetTimerBackoff { +pub struct ResetTimerBackoff { backoff: B, - clock: C, last_backoff: Option, reset_duration: Duration, } impl ResetTimerBackoff { pub fn new(backoff: B, reset_duration: Duration) -> Self { - Self::new_with_custom_clock(backoff, reset_duration, SystemClock {}) - } -} - -impl ResetTimerBackoff { - fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self { Self { backoff, - clock, last_backoff: None, reset_duration, } } } -impl Backoff for ResetTimerBackoff { - fn next_backoff(&mut self) -> Option { +impl Iterator for ResetTimerBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { if let Some(last_backoff) = self.last_backoff { - if self.clock.now() > last_backoff + self.reset_duration { + if tokio::time::Instant::now().into_std() > last_backoff + self.reset_duration { tracing::debug!( ?last_backoff, reset_duration = ?self.reset_duration, @@ -39,48 +43,40 @@ impl Backoff for ResetTimerBackoff { self.backoff.reset(); } } - self.last_backoff = Some(self.clock.now()); - self.backoff.next_backoff() + self.last_backoff = Some(tokio::time::Instant::now().into_std()); + self.backoff.next() } +} +impl Backoff for ResetTimerBackoff { fn reset(&mut self) { - // Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires. + self.backoff.reset(); } } #[cfg(test)] mod tests { - use backoff::{backoff::Backoff, Clock}; use tokio::time::advance; use super::ResetTimerBackoff; use crate::utils::stream_backoff::tests::LinearBackoff; - use std::time::{Duration, Instant}; + use std::time::Duration; #[tokio::test] async fn should_reset_when_timer_expires() { tokio::time::pause(); - let mut backoff = ResetTimerBackoff::new_with_custom_clock( + let mut backoff = ResetTimerBackoff::new( LinearBackoff::new(Duration::from_secs(2)), Duration::from_secs(60), - TokioClock, ); - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); advance(Duration::from_secs(40)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4))); + assert_eq!(backoff.next(), Some(Duration::from_secs(4))); advance(Duration::from_secs(40)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6))); + assert_eq!(backoff.next(), Some(Duration::from_secs(6))); advance(Duration::from_secs(80)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); advance(Duration::from_secs(80)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); - } - - struct TokioClock; - - impl Clock for TokioClock { - fn now(&self) -> Instant { - tokio::time::Instant::now().into_std() - } + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); } } diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 74cc7cf2f..e2722b0fa 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -9,7 +9,7 @@ mod reflect; mod stream_backoff; mod watch_ext; -pub use backoff_reset_timer::ResetTimerBackoff; +pub use backoff_reset_timer::{Backoff, ResetTimerBackoff}; pub use event_decode::EventDecode; pub use event_modify::EventModify; pub use predicate::{predicates, Predicate, PredicateFilter}; diff --git a/kube-runtime/src/utils/stream_backoff.rs b/kube-runtime/src/utils/stream_backoff.rs index 01c6c4292..a23a3461e 100644 --- a/kube-runtime/src/utils/stream_backoff.rs +++ b/kube-runtime/src/utils/stream_backoff.rs @@ -1,10 +1,11 @@ use std::{future::Future, pin::Pin, task::Poll}; -use backoff::backoff::Backoff; use futures::{Stream, TryStream}; use pin_project::pin_project; use tokio::time::{sleep, Instant, Sleep}; +use crate::utils::Backoff; + /// Applies a [`Backoff`] policy to a [`Stream`] /// /// After any [`Err`] is emitted, the stream is paused for [`Backoff::next_backoff`]. The @@ -71,7 +72,7 @@ impl Stream for StreamBackoff { let next_item = this.stream.try_poll_next(cx); match &next_item { Poll::Ready(Some(Err(_))) => { - if let Some(backoff_duration) = this.backoff.next_backoff() { + if let Some(backoff_duration) = this.backoff.next() { let backoff_sleep = sleep(backoff_duration); tracing::debug!( deadline = ?backoff_sleep.deadline(), @@ -98,16 +99,54 @@ impl Stream for StreamBackoff { pub(crate) mod tests { use std::{pin::pin, task::Poll, time::Duration}; + use crate::utils::Backoff; + use super::StreamBackoff; - use backoff::backoff::Backoff; + use backon::BackoffBuilder; use futures::{channel::mpsc, poll, stream, StreamExt}; + pub struct ConstantBackoff { + inner: backon::ConstantBackoff, + delay: Duration, + max_times: usize, + } + + impl ConstantBackoff { + pub fn new(delay: Duration, max_times: usize) -> Self { + Self { + inner: backon::ConstantBuilder::default() + .with_delay(delay) + .with_max_times(max_times) + .build(), + delay, + max_times, + } + } + } + + impl Iterator for ConstantBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + self.inner.next() + } + } + + impl Backoff for ConstantBackoff { + fn reset(&mut self) { + self.inner = backon::ConstantBuilder::default() + .with_delay(self.delay) + .with_max_times(self.max_times) + .build(); + } + } + #[tokio::test] async fn stream_should_back_off() { tokio::time::pause(); let tick = Duration::from_secs(1); let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); - let mut rx = pin!(StreamBackoff::new(rx, backoff::backoff::Constant::new(tick))); + let mut rx = pin!(StreamBackoff::new(rx, ConstantBackoff::new(tick, 10))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2)))); @@ -149,16 +188,27 @@ pub(crate) mod tests { #[tokio::test] async fn backoff_should_close_when_requested() { assert_eq!( - StreamBackoff::new( - stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), - backoff::backoff::Stop {} - ) - .collect::>() - .await, + StreamBackoff::new(stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), StoppedBackoff {}) + .collect::>() + .await, vec![Ok(0), Ok(1), Err(2)] ); } + struct StoppedBackoff; + + impl Backoff for StoppedBackoff { + fn reset(&mut self) {} + } + + impl Iterator for StoppedBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + None + } + } + /// Dynamic backoff policy that is still deterministic and testable pub struct LinearBackoff { interval: Duration, @@ -174,12 +224,16 @@ pub(crate) mod tests { } } - impl Backoff for LinearBackoff { - fn next_backoff(&mut self) -> Option { + impl Iterator for LinearBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { self.current_duration += self.interval; Some(self.current_duration) } + } + impl Backoff for LinearBackoff { fn reset(&mut self) { self.current_duration = Duration::ZERO } diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 7ed636201..241871837 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -9,10 +9,12 @@ use crate::{ }; use kube_client::Resource; -use crate::{reflector::store::Writer, utils::Reflect}; +use crate::{ + reflector::store::Writer, + utils::{Backoff, Reflect}, +}; use crate::watcher::DefaultBackoff; -use backoff::backoff::Backoff; use futures::{Stream, TryStream}; /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 8a649ec17..755320b38 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -2,9 +2,10 @@ //! //! See [`watcher`] for the primary entry point. -use crate::utils::ResetTimerBackoff; +use crate::utils::{Backoff, ResetTimerBackoff}; + use async_trait::async_trait; -use backoff::{backoff::Backoff, ExponentialBackoff}; +use backon::BackoffBuilder; use educe::Educe; use futures::{stream::BoxStream, Stream, StreamExt}; use kube_client::{ @@ -882,6 +883,52 @@ pub fn watch_object Self { + Self { + inner: backon::ExponentialBuilder::default() + .with_min_delay(min_delay) + .with_max_delay(max_delay) + .with_factor(factor) + .with_jitter() + .build(), + min_delay, + max_delay, + factor, + enable_jitter, + } + } +} + +impl Backoff for ExponentialBackoff { + fn reset(&mut self) { + let mut builder = backon::ExponentialBuilder::default() + .with_min_delay(self.min_delay) + .with_max_delay(self.max_delay) + .with_factor(self.factor); + if self.enable_jitter { + builder = builder.with_jitter(); + } + self.inner = builder.build(); + } +} + +impl Iterator for ExponentialBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + self.inner.next() + } +} + /// Default watcher backoff inspired by Kubernetes' client-go. /// /// The parameters currently optimize for being kind to struggling apiservers. @@ -898,24 +945,22 @@ type Strategy = ResetTimerBackoff; impl Default for DefaultBackoff { fn default() -> Self { Self(ResetTimerBackoff::new( - backoff::ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(800)) - .with_max_interval(Duration::from_secs(30)) - .with_randomization_factor(1.0) - .with_multiplier(2.0) - .with_max_elapsed_time(None) - .build(), + ExponentialBackoff::new(Duration::from_millis(800), Duration::from_secs(30), 2.0, true), Duration::from_secs(120), )) } } -impl Backoff for DefaultBackoff { - fn next_backoff(&mut self) -> Option { - self.0.next_backoff() +impl Iterator for DefaultBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + self.0.next() } +} +impl Backoff for DefaultBackoff { fn reset(&mut self) { - self.0.reset() + self.0.reset(); } }